From 684973b761899494d786bf4a975c169037fce95a Mon Sep 17 00:00:00 2001 From: Marta Rybczynska <marta.rybczynska@huawei.com> Date: Thu, 23 Jun 2022 16:56:30 +0200 Subject: [PATCH] scripts/cve-diff.py: add the script cve-diff analyses JSON CVE outputs of two builds and shows the difference between the two. The initial version handles: added/removed packages and CVEs. Signed-off-by: Marta Rybczynska <marta.rybczynska@huawei.com> --- scripts/cve-diff.py | 322 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 322 insertions(+) create mode 100644 scripts/cve-diff.py diff --git a/scripts/cve-diff.py b/scripts/cve-diff.py new file mode 100644 index 00000000..a186c25f --- /dev/null +++ b/scripts/cve-diff.py @@ -0,0 +1,322 @@ +# SPDX-FileCopyrightText: Huawei Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# Create a diff of two JSON CVE statuses + +import sys +import getopt +import re +from deepdiff import DeepDiff +from pprint import pprint + +verbose_level = 0 + + +def show_syntax_and_exit(code): + """ + Show the program syntax and exit with an errror + Arguments: + code: the error code to return + """ + print("Syntax: %s [-h] [-v] file1 file2" % __name__) + sys.exit(code) + + +def exit_error(code, message): + """ + Show the error message and exit with an errror + Arguments: + code: the error code to return + message: the message to show + """ + print("Error: %s" % message) + sys.exit(code) + + +def vprint(*args, **kwargs): + """ + Printing with verbosity levels + Arguments: + args: as for print + kwargs: as for print + """ + global verbose_level + + if verbose_level: + print(*args, **kwargs) + + +def parse_args(argv): + """ + Parse the program arguments, put options in global variables + Arguments: + argv: program arguments + Returns: + Two mandatory file arguments + """ + global verbose_level + try: + opts, args = getopt.getopt(argv, "hv", ["help", "verbose"]) + except getopt.GetoptError: + show_syntax_and_exit(1) + for opt, arg in opts: + if opt in ("-h", "--help"): + show_syntax_and_exit(0) + elif opt in ("-v", "--verbose"): + verbose_level = 1 + else: + show_syntax_and_exit(1) + + if len(args) < 2: + exit_error(1, "Need at least two files to compare") + return (args[0], args[1]) + + +def validate_cve_json(data): + """ + Check correctness of the loaded JSON data + Arguments: + data: loaded data + Returns: + Bool: True if file is correct, False otherwise + Error message: if the file has errors + """ + if not "version" in data or data["version"] != "1": + return False, "Unrecognized format version number" + if not "package" in data: + return False, "Mandatory 'package' key not found" + for package in data["package"]: + keys_in_package = {"name", "layer", "version", "issue"} + if keys_in_package - package.keys(): + return False, "Missing a mandatory key in package: %s" % ( + keys_in_package - package.keys() + ) + + for issue in package["issue"]: + keys_in_issue = {"id", "scorev2", "scorev3", "vector", "status"} + if keys_in_issue - issue.keys(): + return ( + False, + "Missing mandatory keys %s in 'issue' for the package '%s'" + % (keys_in_issue - issue.keys(), package_name), + ) + return True, "" + + +def get_name(value): + """ + Function used for sorting, return the sorting key to use + Argument: + value: raw value + Return: + Package name to use for sorting + """ + return value["name"] + + +def load_cve_json(filename): + """ + Load the JSON file, return the resulting dictionary + Arguments: + filename: the file to open + Returns: + Parsed file as a dictionary + """ + import json + + out = {} + try: + with open(filename, "r") as f: + out = json.load(f) + except FileNotFoundError: + exit_error(1, "Input file (%s) not found" % (filename)) + except json.decoder.JSONDecodeError as error: + exit_error(1, "Malformed JSON file: %s" % str(error)) + + # Validate file + validated, error = validate_cve_json(out) + if not validated: + exit_error(1, error) + + out_sorted = sorted(out["package"], key=get_name) + return out_sorted + + +def calculate_diff(data1, data2): + """ + Calculate a deep diff between two JSON data sets + Arguments: + data1: source data to compare (current) + data2: destination data to compare (upstream) + Returns: + Diff in the DeepDiff tree format + """ + ddiff = DeepDiff(data1, data2, ignore_order=True, view="tree") + return ddiff + + +def calculate_package_diff(diff): + """ + Parse a difference in packages from the deep diff + Arguments: + diff: the deep diff, tree format + Returns: + removed set: added packages list + added_set: removed packages list + """ + removed_set = [] + added_set = [] + vprint("Package status:") + if "iterable_item_added" in diff.keys(): + for p in diff["iterable_item_added"]: + # Only p.t2 exists + # Assure only "root[XXXX]" items + if not re.search("^root\[[0-9]+\]$", p.path()): + # If we have a new issue + # if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]$", p): + # print("New issue") + continue + vprint("Added package: %s %s" % (p.t2["name"], p.t2["version"])) + added_set.append(p.t2) + if "iterable_item_removed" in diff.keys(): + for p in diff["iterable_item_removed"]: + # Only p.t1 exists + # Assure only "root[XXXX]" items + if not re.search("^root\[[0-9]+\]$", p.path()): + # print("removed: Not found in %s" % (p.t1)) + continue + vprint("Removed package: %s %s" % (p.t1["name"], p.t1["version"])) + removed_set.append(p.t1) + if "values_changed" in diff.keys(): + # print("Some values changed") + for p in diff["values_changed"]: + # Filter out name changes, this is addition/removal - removal of t1 + if re.search("^root\[[0-9]+\]\['name'\]$", p.path()): + vprint("Removed package: %s %s" % (p.up.t1["name"], p.up.t1["version"])) + vprint("Added package: %s %s" % (p.up.t2["name"], p.up.t2["version"])) + removed_set.append(p.up.t1) + added_set.append(p.up.t2) + # TODO: Handle the special case of linux + else: + # If a high level property root[XXX]['somename'] of the item that was changed + if re.search("^root\[[0-9]+\]\['[a-z,A-Z,0-9]+'\]$", p.path()): + # If the parent has been removed, this item is to be ignored + # We take it into account only if paren't hasn't been removed + if not p.up.t1 in removed_set: + vprint("Found changed items for: %s" % (p.up.t1["name"])) + # product table varies when the high level has changed + # Format: [root][XX]['products'][XX]['product'] + elif re.search( + "^root\[[0-9]+\]\['products'\]\[[0-9]+\]\['product'\]$", p.path() + ): + if p.up.up.up.t1 not in removed_set: + vprint( + "Change of product names for package: %s %s" + % (p.up.up.up.t1["name"], p.up.up.up.t2["name"]) + ) + # TODO: product table varies when the high level has changed + # TODO: issue table version when the high level has changed + # else: + # print(p) + vprint("") + return (removed_set, added_set) + + +def calculate_cve_diff(diff): + """ + Parse a difference in CVEs from the deep diff + Arguments: + diff: the deep diff, tree format + Returns: + removed_cves: removed CVEs + added_cves: added CVEs + """ + added_cves = [] + removed_cves = [] + vprint("CVE status:") + # Look for added/removed CVEs + if "values_changed" in diff.keys(): + # Differences like <root[42]['issue'][2562]['status'] t1:'Patched', t2:'Unpatched'> + for p in diff["values_changed"]: + if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]\['status'\]$", p.path()): + if (p.t1 == "Patched" or p.t1 == "Ignored") and p.t2 == "Unpatched": + # New CVE + vprint( + "Unpatched CVE: %s (%s %s)" + % ( + p.up.t2["id"], + p.up.up.up.t2["name"], + p.up.up.up.t2["version"], + ) + ) + added_cves.append(p.up.t2["id"]) + elif p.t1 == "Unpatched" and (p.t2 == "Patched" or p.t2 == "Ignored"): + # Fixed CVE + vprint( + "Fixed CVE: %s (%s %s)" + % ( + p.up.t2["id"], + p.up.up.up.t2["name"], + p.up.up.up.t2["version"], + ) + ) + removed_cves.append(p.up.t2["id"]) + else: + vprint( + "Unknown status of CVE: %s (%s %s)" + % ( + p.up.t2["id"], + p.up.up.up.t2["name"], + p.up.up.up.t2["version"], + ) + ) + # Other changes in the issue table + elif re.search("^root\[[0-9]+\]\['issue'\].*$", p.path()): + # The whole issue element has changed + if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]$", p.path()): + if p.up.up.t1["name"] == p.up.up.t2["name"]: + vprint( + "Changed CVE for package %s %s" + % (p.up.up.t2["name"], p.up.up.t2["version"]) + ) + else: + # Some fields of the issue changed + if re.search( + "^root\[[0-9]+\]\['issue'\]\[[0-9]+\]\['[a-z,A-Z]+'\]$", + p.path(), + ): + # If package names are different, this is a spurious diff + if p.up.up.up.t1["name"] == p.up.up.up.t2["name"]: + vprint( + "Changed CVE for package %s %s" + % (p.up.up.up.t2["name"], p.up.up.up.t2["version"]) + ) + vprint("") + return (removed_cves, added_cves) + + +def main(argv): + file1, file2 = parse_args(argv) + + data1 = load_cve_json(file1) + data2 = load_cve_json(file2) + diff = calculate_diff(data1, data2) + + removed_set, added_set = calculate_package_diff(diff) + + removed_cves, added_cves = calculate_cve_diff(diff) + + print("Summary:") + print( + "Package report: the new version adds %s packages and removes %d packages" + % (len(added_set), len(removed_set)) + ) + print( + "CVE report: the new version removes %s CVEs and adds %s CVEs" + % (len(removed_cves), len(added_cves)) + ) + + +if __name__ == "__main__": + main(sys.argv[1:]) -- GitLab