#!/usr/bin/env python3 import sys import subprocess import re import os from email.utils import parseaddr import sh import logging import argparse from junitparser import TestCase, TestSuite, JUnitXml, Skipped, Error, Failure, Attr from github import Github from shutil import copyfile, copytree import json from pprint import pprint if "ZEPHYR_BASE" not in os.environ: logging.warn("$ZEPHYR_BASE environment variable undefined.\n") repository_path = os.getcwd() else: repository_path = os.environ['ZEPHYR_BASE'] logger = None DOCS_WARNING_FILE = "doc.warnings" sh_special_args = { '_tty_out': False, '_cwd': repository_path } # Put the Kconfiglib path first to make sure no local Kconfiglib version is # used sys.path.insert(0, os.path.join(repository_path, "scripts/kconfig")) import kconfiglib def get_shas(refspec): sha_list = sh.git("rev-list", '--max-count={0}'.format(-1 if "." in refspec else 1), refspec, **sh_special_args).split() return sha_list class MyCase(TestCase): classname = Attr() doc = Attr() class ComplianceTest: _name = "" _title = "" _doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html" def __init__(self, suite, range): self.case = None self.suite = suite self.commit_range = range def prepare(self): self.case = MyCase(self._name) self.case.classname = "Guidelines" print("Running {} tests...".format(self._name)) def run(self): pass class CheckPatch(ComplianceTest): _name = "checkpatch" _doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#coding-style" def run(self): self.prepare() diff = subprocess.Popen(('git', 'diff', '%s' %(self.commit_range)), stdout=subprocess.PIPE) try: output = subprocess.check_output(('%s/scripts/checkpatch.pl' %repository_path, '--mailback', '--no-tree', '-'), stdin=diff.stdout, stderr=subprocess.STDOUT, shell=True) except subprocess.CalledProcessError as ex: m = re.search("([1-9][0-9]*) errors,", ex.output.decode('utf8')) if m: self.case.result = Failure("Checkpatch issues", "failure") self.case.result._elem.text = (ex.output.decode('utf8')) class KconfigCheck(ComplianceTest): _name = "Kconfig" _doc = "https://docs.zephyrproject.org/latest/application/kconfig-tips.html" def run(self): self.prepare() # Look up Kconfig files relative to ZEPHYR_BASE os.environ["srctree"] = repository_path # Parse the entire Kconfig tree, to make sure we see all symbols os.environ["SOC_DIR"] = "soc/" os.environ["BOARD_DIR"] = "boards/*/*" os.environ["ARCH"] = "*" # Enable strict Kconfig mode in Kconfiglib, which assumes there's just a # single Kconfig tree and warns for all references to undefined symbols os.environ["KCONFIG_STRICT"] = "y" undef_ref_warnings = [] for warning in kconfiglib.Kconfig().warnings: if "undefined symbol" in warning: undef_ref_warnings.append(warning) # Generating multiple JUnit s would be neater, but Shippable only # seems to display the first one if undef_ref_warnings: self.case.result = Failure("undefined Kconfig symbols", "failure") self.case.result._elem.text = "\n\n\n".join(undef_ref_warnings) class Documentation(ComplianceTest): _name = "Documentation" _doc = "https://docs.zephyrproject.org/latest/contribute/doc-guidelines.html" def run(self): self.prepare() if os.path.exists(DOCS_WARNING_FILE) and os.path.getsize(DOCS_WARNING_FILE) > 0: with open(DOCS_WARNING_FILE, "rb") as f: log = f.read() self.case.result = Error("Documentation Issues", "failure") self.case.result._elem.text = log.decode('utf8') class GitLint(ComplianceTest): _name = "Gitlint" _doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#commit-guidelines" def run(self): self.prepare() proc = subprocess.Popen('gitlint --commits %s' % (self.commit_range), shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) msg = "" if proc.wait() != 0: msg = proc.stdout.read() if msg != "": text = (msg.decode('utf8')) self.case.result = Failure("commit message syntax issues", "failure") self.case.result._elem.text = text class License(ComplianceTest): _name = "License" _doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#licensing" def run(self): self.prepare() scancode = "/opt/scancode-toolkit/scancode" if not os.path.exists(scancode): self.case.result = Skipped("scancode-toolkit not installed", "skipped") return os.makedirs("scancode-files", exist_ok=True) new_files = sh.git("diff", "--name-only", "--diff-filter=A", self.commit_range, **sh_special_args) if len(new_files) == 0: return for newf in new_files: f = str(newf).rstrip() os.makedirs(os.path.join('scancode-files', os.path.dirname(f)), exist_ok=True) copy = os.path.join("scancode-files", f) copyfile(f, copy) try: cmd = [scancode, '--verbose', '--copyright', '--license', '--license-diag', '--info', '--classify', '--summary', '--json', 'scancode.json', 'scancode-files/'] cmd_str = " ".join(cmd) logging.info(cmd_str) out = subprocess.check_output(cmd_str, stderr=subprocess.STDOUT, shell=True) except subprocess.CalledProcessError as e: logging.error(e.output) self.case.result = Skipped("Exception when running scancode", "skipped") return report = "" with open ('scancode.json', 'r') as json_fp: scancode_results = json.load(json_fp) for file in scancode_results['files']: if file['type'] == 'directory': continue original_fp = str(file['path']).replace('scancode-files/', '') licenses = file['licenses'] if (file['is_script'] or file['is_source']) and (file['programming_language'] not in ['CMake']) and (file['extension'] not in ['.yaml']): if len(file['licenses']) == 0: report += ("* {} missing license.\n".format(original_fp)) else: for l in licenses: if l['key'] != "apache-2.0": report += ("* {} is not apache-2.0 licensed: {}\n".format(original_fp, l['key'])) if l['category'] != 'Permissive': report += ("* {} has non-permissive license: {}\n".format(original_fp, l['key'])) if len(file['copyrights']) == 0: report += ("* {} missing copyright.\n".format(original_fp)) if report != "": self.case.result = Failure("License/Copyright issues", "failure") self.case.result._elem.text = report class Identity(ComplianceTest): _name = "Identity/Emails" _doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#commit-guidelines" def run(self): self.prepare() for f in get_shas(self.commit_range): commit = sh.git("log", "--decorate=short", "-n 1", f, **sh_special_args) signed = [] author = "" sha = "" parsed_addr = None for line in commit.split("\n"): match = re.search("^commit\s([^\s]*)", line) if match: sha = match.group(1) match = re.search("^Author:\s(.*)", line) if match: author = match.group(1) parsed_addr = parseaddr(author) match = re.search("signed-off-by:\s(.*)", line, re.IGNORECASE) if match: signed.append(match.group(1)) error1 = "%s: author email (%s) needs to match one of the signed-off-by entries." % (sha, author) error2 = "%s: author email (%s) does not follow the syntax: First Last ." % (sha, author) failure = None if author not in signed: failure = error1 if not parsed_addr or len(parsed_addr[0].split(" ")) < 2: if not failure: failure = error2 else: failure = failure + "\n" + error2 if failure: self.case.result = Failure("identity/email issues", "failure") self.case.result._elem.text = failure def init_logs(): global logger log_lev = os.environ.get('LOG_LEVEL', None) level = logging.INFO if log_lev == "DEBUG": level = logging.DEBUG elif log_lev == "ERROR": level = logging.ERROR console = logging.StreamHandler() format = logging.Formatter('%(levelname)-8s: %(message)s') console.setFormatter(format) logger = logging.getLogger('') logger.addHandler(console) logger.setLevel(level) logging.debug("Log init completed") def parse_args(): parser = argparse.ArgumentParser( description="Check for coding style and documentation warnings.") parser.add_argument('-c', '--commits', default=None, help="Commit range in the form: a..b") parser.add_argument('-g', '--github', action="store_true", help="Send results to github as a comment.") parser.add_argument('-r', '--repo', default=None, help="Github repository") parser.add_argument('-p', '--pull-request', default=0, type=int, help="Pull request number") parser.add_argument('-s', '--status', action="store_true", help="Set status to pending") parser.add_argument('-S', '--sha', default=None, help="Commit SHA") return parser.parse_args() def set_status(gh, repo, sha): repo = gh.get_repo(repo) commit = repo.get_commit(sha) for Test in ComplianceTest.__subclasses__(): t = Test(None, "") print("Creating status for %s" %(t._name)) commit.create_status('pending', '%s' %t._doc, 'Verification in progress', '{}'.format(t._name)) def main(): args = parse_args() github_token = '' gh = None if args.github: github_token = os.environ['GH_TOKEN'] gh = Github(github_token) if args.status and args.sha != None and args.repo and gh: set_status(gh, args.repo, args.sha) sys.exit(0) if not args.commits: sys.exit(1) suite = TestSuite("Compliance") docs = {} for Test in ComplianceTest.__subclasses__(): t = Test(suite, args.commits) t.run() suite.add_testcase(t.case) docs[t.case.name] = t._doc xml = JUnitXml() xml.add_testsuite(suite) xml.update_statistics() xml.write('compliance.xml') if args.github: repo = gh.get_repo(args.repo) pr = repo.get_pull(int(args.pull_request)) commit = repo.get_commit(args.sha) comment = "Found the following issues, please fix and resubmit:\n\n" comment_count = 0 print("Processing results...") for case in suite: if case.result and case.result.type != 'skipped': comment_count += 1 comment += ("## {}\n".format(case.result.message)) comment += "\n" if case.name not in ['Gitlint', 'Identity/Emails', 'License']: comment += "```\n" comment += ("{}\n".format(case.result._elem.text)) if case.name not in ['Gitlint', 'Identity/Emails', 'License']: comment += "```\n" commit.create_status('failure', docs[case.name], 'Verification failed', '{}'.format(case.name)) else: commit.create_status('success', docs[case.name], 'Verifications passed', '{}'.format(case.name)) if args.repo and args.pull_request and comment_count > 0: comments = pr.get_issue_comments() commented = False for c in comments: if 'Found the following issues, please fix and resubmit' in c.body: c.edit(comment) commented = True break if not commented: pr.create_issue_comment(comment) if __name__ == "__main__": main()