Blob Blame History Raw
#!/usr/bin/python3
import os, sys, re, argparse, textwrap
import bugzilla
from bugzilla._cli import DEFAULT_BZ
BSC_PATTERN = re.compile(r'\sbsc#([0-9][0-9]*)\s')
MAINTAINERS_PATTERN = re.compile(r'\s(\S+\@suse.\S+\s\([0-9]+\))')
EMAIL_PATTERN = re.compile(r'[\s,:](\S+\@suse.\S+)')
CC_PATTERN = re.compile(r'^\s*CC[\s:]\s*(\S+\@suse.\S+[,\s]*)+$')
NEEDINFO_PATTERN = re.compile(r'^\s*NEEDINFO[\s:]\s*(\S+\@suse.\S+[,\s]*)+$')
ASSIGNEE_PATTERN = re.compile(r'^\s*ASSIGNEE[\s:]\s*\S+\@suse.\S+\s*$')
CLOSING_COMMENT = 'Switching back to the security team.'
SECURITY_EMAIL = 'security-team@suse.de'
MONKEY_EMAIL = 'cve-kpm@suse.de'
QUEUE_EMAIL = 'kernel-bugs@suse.de'
COMMENT_BANLIST = [ 'swamp@suse.de', 'bwiedemann+obsbugzillabot@suse.com', 'maint-coord+maintenance-robot@suse.de', 'smash_bz@suse.de' ]
MIN_COMMENTS = 2
def check_being_logged_in(bzapi):
if not bzapi.logged_in:
print("You are not logged in the bugzilla!\n\nGo to https://bugzilla.suse.com/, log in via web interace with your credentials.\n"\
"Then go to Preferences, click on the tab API KEYS and generate a new api key\nif you don't have one already. Then store "\
"the api_key in a file ~/.bugzillarc\nin the following format...\n\n# ~/.bugzillarc\n[apibugzilla.suse.com]\napi_key = YOUR_API_KEY")
sys.exit(1)
def make_url(id):
return f'https://bugzilla.suse.com/show_bug.cgi?id={id}'
class BugUpdate:
def __init__(self, path_to_remove, bug, comment_lines, to_append, email, action, cc_list=None, needinfo_list=None):
self.path_to_remove = path_to_remove
self.comment = "".join(comment_lines) + to_append
self.email = email
self.original_email = '<unknown>'
self.bug = bug
self.action = action
self.already_dispatched = False
self.unknown_state = False
self.cc_list = cc_list if cc_list else []
self.needinfo_list = needinfo_list if needinfo_list else []
self.cc_add = []
self.cve = ''
self.any_flags = False
self.bz_comments = []
self.human_comments = []
def __str__(self):
return f"{make_url(self.bug)} {self.cve:<14} {self.action:<9} ({self.original_email} -> {self.email}"\
f"{', CC: ' + ', '.join(self.cc_add) if self.cc_add else ''}{', NEEDINFO: ' + ', '.join(self.needinfo_list) if self.needinfo_list else ''})"
def dispatch_to_bugzilla(self, bzapi, force):
if not force and (self.already_dispatched or self.unknown_state):
return
bargs = { 'comment': self.comment, 'comment_private': True, 'assigned_to': self.email }
if self.cc_add:
bargs['cc_add'] = self.cc_add
if self.needinfo_list and not self.any_flags:
bargs['flags'] = [ { 'name': 'needinfo', 'requestee': rmail, 'status': '?', 'type_id': 4 } for rmail in self.needinfo_list ]
vals = bzapi.build_update(**bargs)
if self.any_flags:
print(f'Warning: bsc#{self.bug} has already flags set, skipping needinfo update!', file=sys.stderr)
try:
bzapi.update_bugs([self.bug], vals)
if self.path_to_remove:
os.remove(self.path_to_remove)
except Exception as e:
print(f"Failed to update bsc#{self.bug}: {e}", file=sys.stderr)
else:
print(f'OK: {make_url(self.bug)}#c{len(self.bz_comments)}')
def ask_user(bzapi, todo, yes, force):
print("\n*** ACTIONS ***")
something_to_do = False
for b in todo:
if not force and b.unknown_state:
print(f"{make_url(b.bug)} {b.cve:<14} is in an uknown state, better do nothing!", file=sys.stderr)
continue
if not force and b.already_dispatched:
print(f"{make_url(b.bug)} {b.cve:<14} is already dispatched to {b.original_email}, better do nothing!", file=sys.stderr)
if b.original_email != b.email:
print(f"WARNING: you want to dispatch to {b.email}, but the bug is dispatched to {b.original_email} already!", file=sys.stderr)
continue
if len(b.human_comments) > MIN_COMMENTS:
print(f"WARNING: {make_url(b.bug)} might not be a new bug. Have a look at the history. "\
f"The last human comment (#{len(b.human_comments)}) is in {make_url(b.bug)}#c{b.human_comments[len(b.human_comments) - 1]['count']}!", file=sys.stderr)
something_to_do = True
print(b)
if not yes:
while something_to_do:
answer = input("Do you want to submit the following updates to the bugzilla? (y/n) ")
if answer == 'n':
print("...aborting...", file=sys.stderr)
return
if answer == 'y':
break
print()
for b in todo:
b.dispatch_to_bugzilla(bzapi, force)
def make_unique(alist):
try:
return { c for c in alist if c.startswith('CVE-') }.pop()
except:
return ''
def update_bug_metadata(bzapi, todo):
bugs, comments = None, None
try:
bugs = bzapi.getbugs([ b.bug for b in todo ], include_fields=["id", "assigned_to", "alias", "cc", "flags"])
comments = bzapi.get_comments([ b.bug for b in todo ])
except Exception as e:
print(f"Couldn't query bugzilla: {e}", file=sys.stderr)
sys.exit(4)
if not bugs:
print(f"Couldn't find any of the following bugs: {[ b.bug for b in todo ]}", file=sys.stderr)
sys.exit(5)
emails = { b.id: b.assigned_to for b in bugs }
cves = { b.id: make_unique(b.alias) for b in bugs }
ccs = { b.id: b.cc for b in bugs }
any_flags = { b.id: bool(b.flags) for b in bugs }
for b in todo:
b.bz_comments = comments['bugs'][str(b.bug)]['comments']
b.human_comments = [ c for c in b.bz_comments if c['creator'] not in COMMENT_BANLIST ]
b.cve = cves.get(b.bug, '')
b.original_email = emails.get(b.bug, '<unknown>')
b.any_flags = any_flags.get(b.bug, False)
if b.bug in ccs:
b.cc_add = list(set(b.cc_list) - set(ccs.get(b.bug)))
if b.original_email == '<unknown>':
b.unknown_state = True
elif b.original_email != QUEUE_EMAIL:
b.already_dispatched = True
def handle_file(bzapi, path, to_dispatch, remove_file, is_interactive=True):
with open(path, 'r') as f:
decided = False
bug = 0
comment_lines = []
candidates = []
candidate_emails = []
cc_list = []
needinfo_list = []
for l in f:
should_go_out = True
if l.startswith('Security fix for CVE-'):
m = re.search(BSC_PATTERN, l)
if m:
bug = int(m.group(1))
if l.startswith('NO CODESTREAM AFFECTED') or l.startswith('NO ACTION NEEDED'):
candidate_emails = [ SECURITY_EMAIL ]
decided = True
elif 'TRIVIAL_BACKPORT' in l:
candidate_emails = [ MONKEY_EMAIL ]
decided = True
should_go_out = False
elif re.search(ASSIGNEE_PATTERN, l):
mm = re.findall(EMAIL_PATTERN, l)
if mm and len(mm) == 1:
candidate_emails = [ mm[0].strip(", ") ]
decided = True
should_go_out = False
elif re.search(CC_PATTERN, l):
mm = re.findall(EMAIL_PATTERN, l)
if mm:
cc_list.extend([ cc_entry.strip(", ") for cc_entry in mm ])
should_go_out = False
elif re.search(NEEDINFO_PATTERN, l):
mm = re.findall(EMAIL_PATTERN, l)
if mm:
needinfo_list.extend([ needinfo_entry.strip(", ") for needinfo_entry in mm ])
should_go_out = False
elif l.startswith('Experts candidates:'):
mm = re.findall(MAINTAINERS_PATTERN, l)
if mm:
candidates = mm
should_go_out = False
if is_interactive:
print(l)
if should_go_out:
comment_lines.append(l)
if not bug:
print(f"'{path}' doesn't seem to contain any bug number, skipping. Be sure to regenerate c-k-f output with all the repos up-to-date.", file=sys.stderr)
return
if not decided and candidates:
candidates.append(MONKEY_EMAIL)
candidate_emails = [ e.split(" ")[0] for e in candidates ]
if not candidate_emails:
print(f"{path} doesn't have any viable assignees.", file=sys.stderr)
if is_interactive:
sys.exit(1)
else:
return
if is_interactive:
for cl in comment_lines:
print(cl, end='')
email = None if len(candidate_emails) != 1 else candidate_emails[0]
if not email:
if not is_interactive:
print(f'Skipping {path} (bsc#{bug}) due to missing ASSIGNEE!', file=sys.stderr)
return
for n, c in enumerate(candidates, 1):
print("\t{:>3}: {}".format(n, c))
while not email:
answer = input('(select a number, type q for abort or enter a custom email)> ')
if answer == 'q':
print("...aborting...", file=sys.stderr)
sys.exit(0)
if "@suse." in answer and ' ' not in answer:
email = answer
else:
try:
answer = int(answer)
if answer < 1 or answer > len(candidates):
raise Exception()
except:
print("{} is not a number between 1 and {}.".format(answer, len(candidates)))
continue
email = candidate_emails[answer - 1]
break
to_add = ''
to_dispatch.append(BugUpdate(path if remove_file else None, bug, comment_lines, to_add, email, 'developer', cc_list, needinfo_list))
def single_dispatch(bzapi, path, remove_file, yes, force):
to_dispatch = []
handle_file(bzapi, path, to_dispatch, remove_file, is_interactive=not yes)
update_bug_metadata(bzapi, to_dispatch)
ask_user(bzapi, to_dispatch, yes, force)
def multiple_dispatch(bzapi, path, remove_file, yes, force):
to_dispatch = []
nfiles = 0
for subdir, dirs, files in os.walk(path):
for ckf in files:
nfiles += 1
opath = subdir + os.sep + ckf
handle_file(bzapi, opath, to_dispatch, remove_file, is_interactive=False)
if not nfiles:
sys.exit(0)
update_bug_metadata(bzapi, to_dispatch)
ask_user(bzapi, to_dispatch, yes, force)
def parse_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''Updating bugzilla based on ./scripts/check-kernel-fix output. There are 2 modes.
1/ File mode (single dispatch) where the input is a single file containing ./scripts/check-kernel-fix output.
You can append the following to the c-k-f output and will be interpreted by this script.
ASSIGNEE <email1>
CC <email1> <email2> ...
NEEDINFO <email1> <email2> ...
TRIVIAL_BACKPORT
2/ Directory mode (multiple dispatch) is like File mode, but it goes through all the files in a directory
and processes only those that do not need an input, skipping the rest.
The bugzilla comment will always contain copy of the ./scripts/check-kernel-fix output taken from the file.
'''))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-f", "--file", help="path to a regular file containing ./scripts/check-kernel-fix output", default=None, type=str)
group.add_argument("-d", "--dir", help="path to directory containing regular files with ./scripts/check-kernel-fix outputs", default=None, type=str)
parser.add_argument("-r", "--remove-file", help="Remove file after dispatching CVE", default=None, action="store_true")
parser.add_argument("-y", "--yes", help="Dispatch without asking; never use :-)", default=None, action="store_true")
parser.add_argument("--force", help="Bypass already dispatched check", default=None, action="store_true")
return parser.parse_args()
if __name__ == "__main__":
try:
args = parse_args()
bzapi = bugzilla.Bugzilla(DEFAULT_BZ)
check_being_logged_in(bzapi)
if args.file and os.path.isfile(args.file):
single_dispatch(bzapi, args.file, args.remove_file, args.yes, args.force)
sys.exit(0)
if args.dir and os.path.isdir(args.dir):
multiple_dispatch(bzapi, args.dir, args.remove_file, args.yes, args.force)
sys.exit(0)
print(f"{args.file or args.dir} must be either regular file or a directory", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(1)