#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """Differential testing of cbmc-viewer.""" from pathlib import Path import copy import json import logging import os import re import shutil import subprocess import sys import arguments ################################################################ # Parse command line arguments and configure logging def parse_arguments(): """Parse command line arguemnts""" description = """ Differential testing of cbmc-viewer. Run two versions of cbmc-viewer on the output of cbmc and collect the results in two directories for easy comparison. The script assumes the proofs are run using litani (for example, by running the script run-cbmc-proofs.py in the cbmc starter kit). """ options = [ {'flag': '--proofs', 'type': Path, 'help': """ The directory containing the proofs. This is the directory in which litani was run (the directory cbmc/proofs containing the script run-cbmc-proofs.py in the cbmc starter kit) and the directory in which litani wrote the file .litani-cache-dir. """ }, {'flag': '--viewer-repository', 'type': Path, 'default': '.', 'help': "The root of the cbmc-viewer repository. Defaults to '%(default)s'." }, {'flag': '--commits', 'type': str, 'default': [], 'nargs': '*', 'help': """ The two versions of cbmc-viewer to compare. The versions are specified by giving names of commits or branches in the cbmc-viewer repository. """ }, {'flag': '--reports', 'type': Path, 'default': [], 'nargs': '*', 'help': """ The two report directories into which the two sets of reports generated by the cbmc-viewer will be written. The report for any given proof will be stored in a subdirectory having the name of the proof. Defaults to '/tmp/reports/commit1' and '/tmp/reports/commit2'. """ }, {'flag': '--installs', 'type': Path, 'default': [], 'nargs': '*', 'help': """ The two installation directories into which the two versions of cbmc-viewer will be installed (as python virtual environments). Defaults to '/tmp/viewer/commit1' and '/tmp/viewer/commit2'. """ }, {'flag': '--litani', 'type': Path, 'default': 'litani', 'help': "Command to invoke litani. Defaults to '%(default)s'."}, {'flag': '--force', 'action': 'store_true', 'help': 'Overwrite existing report or installation directories'}, {'flag': '--utility-tests', 'action': 'store_true', 'help': 'Compare viewer json output with make_* json output'}, ] args = arguments.create_parser(options, description).parse_args() arguments.configure_logging(args) return args ################################################################ # Run a command def run(cmd, stdin=None, check=True, env=None, cwd=None, capture_output=True): """Run a command as a subprocess""" cmd = [str(word) for word in cmd] stdin = str(stdin) if stdin is not None else None kwds = { "env": env, "cwd": cwd, "text": True, "stdin": subprocess.PIPE if stdin else None, "stdout": subprocess.PIPE if capture_output else None, "stderr": subprocess.PIPE if capture_output else None, } with subprocess.Popen(cmd, **kwds) as pipe: stdout, stderr = pipe.communicate(input=stdin) if check and pipe.returncode: logging.debug("Command %s returned %s", cmd, pipe.returncode) for line in stderr.splitlines(): logging.debug(line) raise UserWarning(f"Command '{' '.join(cmd)}' returned '{pipe.returncode}'") if not capture_output: return None return [line.rstrip() for line in stdout.splitlines()] ################################################################ def install_viewer(viewer_commit, venv_root, viewer_repo): """Install a given version of viewer into a virtual environment""" # save the original commit branch = run(["git", "branch", "--show-current"], cwd=viewer_repo)[0] run(["git", "stash"], cwd=viewer_repo) try: logging.info("Setting up virtual environment %s for viewer %s", venv_root, viewer_commit) run(["python3", "-m", "venv", venv_root]) venv_bin = Path(venv_root) / "bin" env = dict(os.environ.items()) env['PATH'] = str(venv_bin) + os.pathsep + env['PATH'] run(["python3", "-m", "pip", "install", "--upgrade", "pip", "setuptools", "wheel", "build"], env=env) logging.info("Installing viewer %s into virtual environment %s", viewer_commit, venv_root) run(["git", "checkout", viewer_commit], cwd=viewer_repo, env=env) run(["make", "install"], cwd=viewer_repo, env=env) venv_viewer = venv_bin / "cbmc-viewer" return venv_viewer except UserWarning as error: logging.info("Failed to install viewer %s into virtual environment %s", viewer_commit, venv_root) logging.info(error) raise finally: logging.info("Restoring viewer repository to %s", branch) run(["git", "checkout", branch], cwd=viewer_repo) try: run(["git", "stash", "pop"], cwd=viewer_repo) except UserWarning: # Probably means prior 'git stash' found nothing to stash # Consider checking `git status --porcelain` to cover this case pass ################################################################ # Reconstruct the litani add-job commands used to run viewer def get_jobs(proof_root, litani): logging.info("Running litani get-jobs...") lines = run([litani, 'get-jobs'], cwd=proof_root) jobs = json.loads(' '.join(lines)) return [job for job in jobs if job["ci_stage"] == "report" and "cbmc-viewer" in job["command"]] def set_jobs(proof_root, litani, jobs): logging.info("Running litani set-jobs...") run([litani, 'set-jobs'], cwd=proof_root, stdin=json.dumps(jobs)) ################################################################ # Update a viewer job to use a different viewer and a different report directory def viewer_options_without_reportdir(viewer_command): """Strip --reportdir from command line options used to invoke viewer.""" command = re.sub(r'\s+', ' ', viewer_command.strip()) # strip path used to invoke viewer from front of command options = command.split(' ', 1)[-1] # strip --reportdir option from within command options = re.sub(r'--reportdir\s+[^\s]+', '', options) return re.sub(r'\s+', ' ', options.strip()) def form_reportdir(report_root, proof_name): reportdir = Path(report_root) / Path(proof_name) return str(reportdir) def update_command(command, new_viewer, new_reportdir): options = viewer_options_without_reportdir(command) return ' '.join([str(new_viewer), options, '--reportdir', new_reportdir]) def update_job(job, new_viewer, new_report_root): job = copy.copy(job) # a shallow copy of job command = job["command"] outputs = job["outputs"] proof_name = job["pipeline_name"] assert len(outputs) == 1 new_reportdir = form_reportdir(new_report_root, proof_name) job["command"] = update_command(command, new_viewer, new_reportdir) job["outputs"] = [new_reportdir] return job def update_jobs(jobs, new_viewer, new_report_root): return [update_job(job, new_viewer, new_report_root) for job in jobs] ################################################################ def parsed_command(cmd): cmd = re.sub(r'\s+', ' ', cmd.strip()) command, *options = cmd.split(' ') keys = options[0::2] vals = options[1::2] pairs = zip(keys, vals) return command, dict(pairs) def make_coverage(cmd, opts, old): cov = opts['--coverage'] src = opts['--srcdir'] if old: cmd = Path(cmd).with_name('make-coverage') return f"{cmd} {cov} --srcdir {src}" return f"{cmd} coverage --coverage {cov} --srcdir {src}" def make_loop(cmd, opts, old): src = opts['--srcdir'] goto = opts['--goto'] if old: cmd = Path(cmd).with_name('make-loop') return f"{cmd} --goto {goto} --srcdir {src}" return f"{cmd} loop --goto {goto} --srcdir {src}" def make_property(cmd, opts, old): prop = opts['--property'] src = opts['--srcdir'] if old: cmd = Path(cmd).with_name('make-property') return f"{cmd} {prop} --srcdir {src}" return f"{cmd} property --property {prop} --srcdir {src}" def make_reachable(cmd, opts, old): src = opts['--srcdir'] goto = opts['--goto'] if old: cmd = Path(cmd).with_name('make-reachable') return f"{cmd} --goto {goto} --srcdir {src}" return f"{cmd} reachable --goto {goto} --srcdir {src}" def make_result(cmd, opts, old): res = opts['--result'] if old: cmd = Path(cmd).with_name('make-result') return f"{cmd} {res}" return f"{cmd} result --result {res}" def make_source(cmd, opts, old): src = opts['--srcdir'] goto = opts['--goto'] if old: cmd = Path(cmd).with_name('make-source') return f"{cmd} --goto {goto} --srcdir {src}" return f"{cmd} source --goto {goto} --srcdir {src}" def make_symbol(cmd, opts, old): src = opts['--srcdir'] goto = opts['--goto'] if old: cmd = Path(cmd).with_name('make-symbol') return f"{cmd} --goto {goto} --srcdir {src}" return f"{cmd} symbol --goto {goto} --srcdir {src}" def make_trace(cmd, opts, old): res = opts['--result'] src = opts['--srcdir'] if old: cmd = Path(cmd).with_name('make-trace') return f"{cmd} {res} --srcdir {src}" return f"{cmd} trace --result {res} --srcdir {src}" def make_kind(job, kind, viewer, report_root, old): job = copy.copy(job) # a shallow copy of job command = job["command"] outputs = job["outputs"] proof_name = job["pipeline_name"] assert len(outputs) == 1 _, old_opts = parsed_command(command) cmd = ({"coverage": make_coverage, "loop": make_loop, "property": make_property, "reachable": make_reachable, "result": make_result, "source": make_source, "symbol": make_symbol, "trace": make_trace}[kind])(viewer, old_opts, old) out = report_root / proof_name / f'json/viewer-{kind}.json' job["command"] = f"{cmd} > {out}" job["outputs"] = [str(out)] return job def make_jobs(job, viewer, report_root, old): return [make_kind(job, kind, viewer, report_root, old) for kind in ["coverage", "loop", "property", "reachable", "result", "source", "symbol", "trace"]] def utility_jobs(jobs, viewer, report_root, old): return [make_job for job in jobs for make_job in make_jobs(job, viewer, report_root, old)] ################################################################ def run_viewer(proof_root, viewer1, root1, viewer2, root2, litani, utility_tests): logging.info("Loading and updating viewer jobs...") viewer_jobs = get_jobs(proof_root, litani) make1 = root1.with_name(root1.name + "-utility") make2 = root2.with_name(root2.name + "-utility") jobs = update_jobs(viewer_jobs, viewer1, root1) + update_jobs(viewer_jobs, viewer2, root2) if utility_tests: jobs += utility_jobs(viewer_jobs, viewer1, make1, old=True) jobs += utility_jobs(viewer_jobs, viewer2, make2, old=False) logging.info("Running litani init...") run([litani, "init", "--project", "Differential testing"]) set_jobs('.', litani, jobs) logging.info("Running litani run-build...") run([litani, "run-build"], capture_output=False) ################################################################ def compare_reports(reports1, reports2): logging.info("Comparing %s and %s...", reports1, reports2) cmd = ["diff", "-rw", reports1, reports2] try: run(cmd) except UserWarning as error: cmd = ' '.join([str(word) for word in cmd]) raise UserWarning(f"Reports differ: {cmd}") from error def compare_utility_reports(reports): reports1 = reports reports2 = reports.with_name(reports.name + "-utility") logging.info("Comparing %s and %s...", reports1, reports2) json1 = sorted(run(['find', '.', '-type', 'd', '-name', 'json'], cwd=reports1)) json2 = sorted(run(['find', '.', '-type', 'd', '-name', 'json'], cwd=reports2)) assert json1 == json2 for path in json1: logging.info("Comparing %s and %s", reports1/path, reports2/path) cmd = ["diff", "-rw", reports1/path, reports2/path] try: run(cmd) except UserWarning as error: cmd = ' '.join([str(word) for word in cmd]) raise UserWarning(f"Reports differ: {cmd}") from error ################################################################ # Validate command line arguments def check_git_status(repo): try: status = run(["git", "status", "--porcelain"], cwd=repo) except UserWarning as error: raise UserWarning("This is not a git repository: git status failed") from error if status: raise UserWarning("This git repository has uncomitted changes.") return True def validate_litani(litani): logging.debug("--litani = %s", litani) path = shutil.which(litani) if path: return path raise UserWarning("Use --litani to give the command to invoke litani") def validate_repository(repo): logging.debug("--viewer-repository = %s", repo) if (repo / '.git').is_dir(): return repo raise UserWarning("Use --viewer-repository to give the root of the cbmc-viewer repository") def validate_proofs(proofs): logging.debug("--proofs = %s", proofs) if (proofs / '.litani_cache_dir').is_file(): return proofs raise UserWarning("Use --proofs to specify a proofs directory (containing .litani_cache_dir)") def validate_commits(commits): logging.debug("--commits = %s", commits) commits = commits or [] try: cmd = ['git', 'rev-parse', '--verify', '--quiet', '--end-of-options'] run([*cmd, commits[0]]) run([*cmd, commits[1]]) return commits except (UserWarning, IndexError) as error: raise UserWarning("Use --commits to specify two commits or branches to compare") from error def validate_reports(reports, commits, force): logging.debug("--reports = %s", reports) reports = reports or [] commits = commits or [] try: if len(reports) not in [0, 2]: raise IndexError if len(reports) == 0: tmp = Path('/tmp/reports') reports = [tmp/commits[0], tmp/commits[1]] if reports[0] == reports[1]: name = reports[0].name # reports[0].name == reports[1].name for idx in [0, 1]: reports[idx] = reports[idx].with_name(name + str(idx)) for report in reports: if report.exists() and not force: raise UserWarning(f"Path {report} exists: use --force to overwrite") return reports except IndexError as error: raise UserWarning("Use --reports to specify two report directories.") from error def validate_installs(installs, commits, force): logging.debug("--installs = %s", installs) installs = installs or [] commits = commits or [] try: if len(installs) not in [0, 2]: raise IndexError if len(installs) == 0: tmp = Path('/tmp/viewer') installs = [tmp/commits[0], tmp/commits[1]] if installs[0] == installs[1]: name = installs[0].name # installs[0].name == installs[1].name for idx in [0, 1]: installs[idx] = installs[idx].with_name(name + str(idx)) for install in installs: if install.exists() and not force: raise UserWarning(f"Path {install} exists: use --force to overwrite") return installs except IndexError as error: raise UserWarning("Use --installs to specify two install directories.") from error def validate_arguments(args): try: args.viewer_repository = validate_repository(args.viewer_repository) check_git_status(args.viewer_repository) args.litani = validate_litani(args.litani) args.proofs = validate_proofs(args.proofs) args.commits = validate_commits(args.commits) args.reports = validate_reports(args.reports, args.commits, args.force) args.installs = validate_installs(args.installs, args.commits, args.force) except UserWarning as error: logging.error(error) sys.exit(1) ################################################################ def main(): args = parse_arguments() validate_arguments(args) logging.debug(args) shutil.rmtree(args.reports[0], ignore_errors=True) shutil.rmtree(args.reports[1], ignore_errors=True) shutil.rmtree(args.installs[0], ignore_errors=True) shutil.rmtree(args.installs[1], ignore_errors=True) viewer1 = install_viewer(args.commits[0], args.installs[0], args.viewer_repository) report1 = args.reports[0] viewer2 = install_viewer(args.commits[1], args.installs[1], args.viewer_repository) report2 = args.reports[1] run_viewer(args.proofs, viewer1, report1, viewer2, report2, args.litani, args.utility_tests) compare_reports(report1, report2) if args.utility_tests: compare_utility_reports(report1) compare_utility_reports(report2) if __name__ == "__main__": main() ################################################################