| #!/usr/bin/python3 |
| import os, sys, re, argparse, textwrap |
| import bugzilla |
| from bugzilla._cli import DEFAULT_BZ |
| |
| BSC_PATTERN = re.compile(r'\sbsc#([0-9][0-9]*)\s') |
| MAINTAINERS_PATTERN = re.compile(r'\s(\S+\@suse.\S+\s\([0-9]+\))') |
| EMAIL_PATTERN = re.compile(r'[\s,:](\S+\@suse.\S+)') |
| CC_PATTERN = re.compile(r'^\s*CC[\s:]\s*(\S+\@suse.\S+[,\s]*)+$') |
| NEEDINFO_PATTERN = re.compile(r'^\s*NEEDINFO[\s:]\s*(\S+\@suse.\S+[,\s]*)+$') |
| ASSIGNEE_PATTERN = re.compile(r'^\s*ASSIGNEE[\s:]\s*\S+\@suse.\S+\s*$') |
| CLOSING_COMMENT = 'Switching back to the security team.' |
| SECURITY_EMAIL = 'security-team@suse.de' |
| MONKEY_EMAIL = 'cve-kpm@suse.de' |
| QUEUE_EMAIL = 'kernel-bugs@suse.de' |
| COMMENT_BANLIST = [ 'swamp@suse.de', 'bwiedemann+obsbugzillabot@suse.com', 'maint-coord+maintenance-robot@suse.de', 'smash_bz@suse.de' ] |
| MIN_COMMENTS = 2 |
| |
| def check_being_logged_in(bzapi): |
| if not bzapi.logged_in: |
| print("You are not logged in the bugzilla!\n\nGo to https://bugzilla.suse.com/, log in via web interace with your credentials.\n"\ |
| "Then go to Preferences, click on the tab API KEYS and generate a new api key\nif you don't have one already. Then store "\ |
| "the api_key in a file ~/.bugzillarc\nin the following format...\n\n# ~/.bugzillarc\n[apibugzilla.suse.com]\napi_key = YOUR_API_KEY") |
| sys.exit(1) |
| |
| def make_url(id): |
| return f'https://bugzilla.suse.com/show_bug.cgi?id={id}' |
| |
| class BugUpdate: |
| def __init__(self, path_to_remove, bug, comment_lines, to_append, email, action, cc_list=None, needinfo_list=None): |
| self.path_to_remove = path_to_remove |
| self.comment = "".join(comment_lines) + to_append |
| self.email = email |
| self.original_email = '<unknown>' |
| self.bug = bug |
| self.action = action |
| self.already_dispatched = False |
| self.unknown_state = False |
| self.cc_list = cc_list if cc_list else [] |
| self.needinfo_list = needinfo_list if needinfo_list else [] |
| self.cc_add = [] |
| self.cve = '' |
| self.any_flags = False |
| self.bz_comments = [] |
| self.human_comments = [] |
| |
| def __str__(self): |
| return f"{make_url(self.bug)} {self.cve:<14} {self.action:<9} ({self.original_email} -> {self.email}"\ |
| f"{', CC: ' + ', '.join(self.cc_add) if self.cc_add else ''}{', NEEDINFO: ' + ', '.join(self.needinfo_list) if self.needinfo_list else ''})" |
| |
| def dispatch_to_bugzilla(self, bzapi, force): |
| if not force and (self.already_dispatched or self.unknown_state): |
| return |
| bargs = { 'comment': self.comment, 'comment_private': True, 'assigned_to': self.email } |
| if self.cc_add: |
| bargs['cc_add'] = self.cc_add |
| if self.needinfo_list and not self.any_flags: |
| bargs['flags'] = [ { 'name': 'needinfo', 'requestee': rmail, 'status': '?', 'type_id': 4 } for rmail in self.needinfo_list ] |
| vals = bzapi.build_update(**bargs) |
| if self.any_flags: |
| print(f'Warning: bsc#{self.bug} has already flags set, skipping needinfo update!', file=sys.stderr) |
| try: |
| bzapi.update_bugs([self.bug], vals) |
| if self.path_to_remove: |
| os.remove(self.path_to_remove) |
| except Exception as e: |
| print(f"Failed to update bsc#{self.bug}: {e}", file=sys.stderr) |
| else: |
| print(f'OK: {make_url(self.bug)}#c{len(self.bz_comments)}') |
| |
| def ask_user(bzapi, todo, yes, force): |
| print("\n*** ACTIONS ***") |
| something_to_do = False |
| for b in todo: |
| if not force and b.unknown_state: |
| print(f"{make_url(b.bug)} {b.cve:<14} is in an uknown state, better do nothing!", file=sys.stderr) |
| continue |
| if not force and b.already_dispatched: |
| print(f"{make_url(b.bug)} {b.cve:<14} is already dispatched to {b.original_email}, better do nothing!", file=sys.stderr) |
| if b.original_email != b.email: |
| print(f"WARNING: you want to dispatch to {b.email}, but the bug is dispatched to {b.original_email} already!", file=sys.stderr) |
| continue |
| if len(b.human_comments) > MIN_COMMENTS: |
| print(f"WARNING: {make_url(b.bug)} might not be a new bug. Have a look at the history. "\ |
| f"The last human comment (#{len(b.human_comments)}) is in {make_url(b.bug)}#c{b.human_comments[len(b.human_comments) - 1]['count']}!", file=sys.stderr) |
| something_to_do = True |
| print(b) |
| if not yes: |
| while something_to_do: |
| answer = input("Do you want to submit the following updates to the bugzilla? (y/n) ") |
| if answer == 'n': |
| print("...aborting...", file=sys.stderr) |
| return |
| if answer == 'y': |
| break |
| print() |
| for b in todo: |
| b.dispatch_to_bugzilla(bzapi, force) |
| |
| def make_unique(alist): |
| try: |
| return { c for c in alist if c.startswith('CVE-') }.pop() |
| except: |
| return '' |
| |
| def update_bug_metadata(bzapi, todo): |
| bugs, comments = None, None |
| try: |
| bugs = bzapi.getbugs([ b.bug for b in todo ], include_fields=["id", "assigned_to", "alias", "cc", "flags"]) |
| comments = bzapi.get_comments([ b.bug for b in todo ]) |
| except Exception as e: |
| print(f"Couldn't query bugzilla: {e}", file=sys.stderr) |
| sys.exit(4) |
| if not bugs: |
| print(f"Couldn't find any of the following bugs: {[ b.bug for b in todo ]}", file=sys.stderr) |
| sys.exit(5) |
| emails = { b.id: b.assigned_to for b in bugs } |
| cves = { b.id: make_unique(b.alias) for b in bugs } |
| ccs = { b.id: b.cc for b in bugs } |
| any_flags = { b.id: bool(b.flags) for b in bugs } |
| for b in todo: |
| b.bz_comments = comments['bugs'][str(b.bug)]['comments'] |
| b.human_comments = [ c for c in b.bz_comments if c['creator'] not in COMMENT_BANLIST ] |
| b.cve = cves.get(b.bug, '') |
| b.original_email = emails.get(b.bug, '<unknown>') |
| b.any_flags = any_flags.get(b.bug, False) |
| if b.bug in ccs: |
| b.cc_add = list(set(b.cc_list) - set(ccs.get(b.bug))) |
| if b.original_email == '<unknown>': |
| b.unknown_state = True |
| elif b.original_email != QUEUE_EMAIL: |
| b.already_dispatched = True |
| |
| def handle_file(bzapi, path, to_dispatch, remove_file, is_interactive=True): |
| with open(path, 'r') as f: |
| decided = False |
| bug = 0 |
| comment_lines = [] |
| candidates = [] |
| candidate_emails = [] |
| cc_list = [] |
| needinfo_list = [] |
| for l in f: |
| should_go_out = True |
| if l.startswith('Security fix for CVE-'): |
| m = re.search(BSC_PATTERN, l) |
| if m: |
| bug = int(m.group(1)) |
| if l.startswith('NO CODESTREAM AFFECTED') or l.startswith('NO ACTION NEEDED'): |
| candidate_emails = [ SECURITY_EMAIL ] |
| decided = True |
| elif 'TRIVIAL_BACKPORT' in l: |
| candidate_emails = [ MONKEY_EMAIL ] |
| decided = True |
| should_go_out = False |
| elif re.search(ASSIGNEE_PATTERN, l): |
| mm = re.findall(EMAIL_PATTERN, l) |
| if mm and len(mm) == 1: |
| candidate_emails = [ mm[0].strip(", ") ] |
| decided = True |
| should_go_out = False |
| elif re.search(CC_PATTERN, l): |
| mm = re.findall(EMAIL_PATTERN, l) |
| if mm: |
| cc_list.extend([ cc_entry.strip(", ") for cc_entry in mm ]) |
| should_go_out = False |
| elif re.search(NEEDINFO_PATTERN, l): |
| mm = re.findall(EMAIL_PATTERN, l) |
| if mm: |
| needinfo_list.extend([ needinfo_entry.strip(", ") for needinfo_entry in mm ]) |
| should_go_out = False |
| elif l.startswith('Experts candidates:'): |
| mm = re.findall(MAINTAINERS_PATTERN, l) |
| if mm: |
| candidates = mm |
| should_go_out = False |
| if is_interactive: |
| print(l) |
| if should_go_out: |
| comment_lines.append(l) |
| if not bug: |
| print(f"'{path}' doesn't seem to contain any bug number, skipping. Be sure to regenerate c-k-f output with all the repos up-to-date.", file=sys.stderr) |
| return |
| if not decided and candidates: |
| candidates.append(MONKEY_EMAIL) |
| candidate_emails = [ e.split(" ")[0] for e in candidates ] |
| |
| if not candidate_emails: |
| print(f"{path} doesn't have any viable assignees.", file=sys.stderr) |
| if is_interactive: |
| sys.exit(1) |
| else: |
| return |
| |
| if is_interactive: |
| for cl in comment_lines: |
| print(cl, end='') |
| email = None if len(candidate_emails) != 1 else candidate_emails[0] |
| if not email: |
| if not is_interactive: |
| print(f'Skipping {path} (bsc#{bug}) due to missing ASSIGNEE!', file=sys.stderr) |
| return |
| for n, c in enumerate(candidates, 1): |
| print("\t{:>3}: {}".format(n, c)) |
| while not email: |
| answer = input('(select a number, type q for abort or enter a custom email)> ') |
| if answer == 'q': |
| print("...aborting...", file=sys.stderr) |
| sys.exit(0) |
| if "@suse." in answer and ' ' not in answer: |
| email = answer |
| else: |
| try: |
| answer = int(answer) |
| if answer < 1 or answer > len(candidates): |
| raise Exception() |
| except: |
| print("{} is not a number between 1 and {}.".format(answer, len(candidates))) |
| continue |
| email = candidate_emails[answer - 1] |
| break |
| to_add = '' |
| to_dispatch.append(BugUpdate(path if remove_file else None, bug, comment_lines, to_add, email, 'developer', cc_list, needinfo_list)) |
| |
| def single_dispatch(bzapi, path, remove_file, yes, force): |
| to_dispatch = [] |
| handle_file(bzapi, path, to_dispatch, remove_file, is_interactive=not yes) |
| update_bug_metadata(bzapi, to_dispatch) |
| ask_user(bzapi, to_dispatch, yes, force) |
| |
| def multiple_dispatch(bzapi, path, remove_file, yes, force): |
| to_dispatch = [] |
| nfiles = 0 |
| for subdir, dirs, files in os.walk(path): |
| for ckf in files: |
| nfiles += 1 |
| opath = subdir + os.sep + ckf |
| handle_file(bzapi, opath, to_dispatch, remove_file, is_interactive=False) |
| if not nfiles: |
| sys.exit(0) |
| update_bug_metadata(bzapi, to_dispatch) |
| ask_user(bzapi, to_dispatch, yes, force) |
| |
| def parse_args(): |
| parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, |
| description=textwrap.dedent('''Updating bugzilla based on ./scripts/check-kernel-fix output. There are 2 modes. |
| |
| 1/ File mode (single dispatch) where the input is a single file containing ./scripts/check-kernel-fix output. |
| You can append the following to the c-k-f output and will be interpreted by this script. |
| |
| ASSIGNEE <email1> |
| CC <email1> <email2> ... |
| NEEDINFO <email1> <email2> ... |
| TRIVIAL_BACKPORT |
| |
| 2/ Directory mode (multiple dispatch) is like File mode, but it goes through all the files in a directory |
| and processes only those that do not need an input, skipping the rest. |
| |
| The bugzilla comment will always contain copy of the ./scripts/check-kernel-fix output taken from the file. |
| ''')) |
| group = parser.add_mutually_exclusive_group(required=True) |
| group.add_argument("-f", "--file", help="path to a regular file containing ./scripts/check-kernel-fix output", default=None, type=str) |
| group.add_argument("-d", "--dir", help="path to directory containing regular files with ./scripts/check-kernel-fix outputs", default=None, type=str) |
| |
| parser.add_argument("-r", "--remove-file", help="Remove file after dispatching CVE", default=None, action="store_true") |
| parser.add_argument("-y", "--yes", help="Dispatch without asking; never use :-)", default=None, action="store_true") |
| parser.add_argument("--force", help="Bypass already dispatched check", default=None, action="store_true") |
| return parser.parse_args() |
| |
| if __name__ == "__main__": |
| try: |
| args = parse_args() |
| |
| bzapi = bugzilla.Bugzilla(DEFAULT_BZ) |
| check_being_logged_in(bzapi) |
| |
| if args.file and os.path.isfile(args.file): |
| single_dispatch(bzapi, args.file, args.remove_file, args.yes, args.force) |
| sys.exit(0) |
| |
| if args.dir and os.path.isdir(args.dir): |
| multiple_dispatch(bzapi, args.dir, args.remove_file, args.yes, args.force) |
| sys.exit(0) |
| |
| print(f"{args.file or args.dir} must be either regular file or a directory", file=sys.stderr) |
| sys.exit(1) |
| except KeyboardInterrupt: |
| sys.exit(1) |