Skip to content
Snippets Groups Projects
Forked from Eclipse Projects / Oniro Core / Oniro
459 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
cve-diff.py 10.64 KiB
# SPDX-FileCopyrightText: Huawei Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# Create a diff of two JSON CVE statuses

import sys
import getopt
import re
from deepdiff import DeepDiff
from pprint import pprint

verbose_level = 0


def show_syntax_and_exit(code):
    """
    Show the program syntax and exit with an errror
    Arguments:
        code: the error code to return
    """
    print("Syntax: %s [-h] [-v] file1 file2" % __name__)
    sys.exit(code)


def exit_error(code, message):
    """
    Show the error message and exit with an errror
    Arguments:
        code: the error code to return
        message: the message to show
    """
    print("Error: %s" % message)
    sys.exit(code)


def vprint(*args, **kwargs):
    """
    Printing with verbosity levels
    Arguments:
        args: as for print
        kwargs: as for print
    """
    global verbose_level

    if verbose_level:
        print(*args, **kwargs)


def parse_args(argv):
    """
    Parse the program arguments, put options in global variables
    Arguments:
        argv: program arguments
    Returns:
        Two mandatory file arguments
    """
    global verbose_level
    try:
        opts, args = getopt.getopt(argv, "hv", ["help", "verbose"])
    except getopt.GetoptError:
        show_syntax_and_exit(1)
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            show_syntax_and_exit(0)
        elif opt in ("-v", "--verbose"):
            verbose_level = 1
        else:
            show_syntax_and_exit(1)

    if len(args) < 2:
        exit_error(1, "Need at least two files to compare")
    return (args[0], args[1])


def validate_cve_json(data):
    """
    Check correctness of the loaded JSON data
    Arguments:
        data: loaded data
    Returns:
        Bool: True if file is correct, False otherwise
        Error message: if the file has errors
    """
    if not "version" in data or data["version"] != "1":
        return False, "Unrecognized format version number"
    if not "package" in data:
        return False, "Mandatory 'package' key not found"
    for package in data["package"]:
        keys_in_package = {"name", "layer", "version", "issue"}
        if keys_in_package - package.keys():
            return False, "Missing a mandatory key in package: %s" % (
                keys_in_package - package.keys()
            )

        for issue in package["issue"]:
            keys_in_issue = {"id", "scorev2", "scorev3", "vector", "status"}
            if keys_in_issue - issue.keys():
                return (
                    False,
                    "Missing mandatory keys %s in 'issue' for the package '%s'"
                    % (keys_in_issue - issue.keys(), package_name),
                )
    return True, ""


def get_name(value):
    """
    Function used for sorting, return the sorting key to use
    Argument:
        value: raw value
    Return:
        Package name to use for sorting
    """
    return value["name"]


def load_cve_json(filename):
    """
    Load the JSON file, return the resulting dictionary
    Arguments:
        filename: the file to open
    Returns:
        Parsed file as a dictionary
    """
    import json

    out = {}
    try:
        with open(filename, "r") as f:
            out = json.load(f)
    except FileNotFoundError:
        exit_error(1, "Input file (%s) not found" % (filename))
    except json.decoder.JSONDecodeError as error:
        exit_error(1, "Malformed JSON file: %s" % str(error))

    # Validate file
    validated, error = validate_cve_json(out)
    if not validated:
        exit_error(1, error)

    out_sorted = sorted(out["package"], key=get_name)
    return out_sorted


def calculate_diff(data1, data2):
    """
    Calculate a deep diff between two JSON data sets
    Arguments:
        data1: source data to compare (current)
        data2: destination data to compare (upstream)
    Returns:
        Diff in the DeepDiff tree format
    """
    ddiff = DeepDiff(data1, data2, ignore_order=True, view="tree")
    return ddiff


def calculate_package_diff(diff):
    """
    Parse a difference in packages from the deep diff
    Arguments:
        diff: the deep diff, tree format
    Returns:
        removed set: added packages list
        added_set: removed packages list
    """
    removed_set = []
    added_set = []
    vprint("Package status:")
    if "iterable_item_added" in diff.keys():
        for p in diff["iterable_item_added"]:
            # Only p.t2 exists
            # Assure only "root[XXXX]" items
            if not re.search("^root\[[0-9]+\]$", p.path()):
                # If we have a new issue
                # if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]$", p):
                #    print("New issue")
                continue
            vprint("Added package: %s %s" % (p.t2["name"], p.t2["version"]))
            added_set.append(p.t2)
    if "iterable_item_removed" in diff.keys():
        for p in diff["iterable_item_removed"]:
            # Only p.t1 exists
            # Assure only "root[XXXX]" items
            if not re.search("^root\[[0-9]+\]$", p.path()):
                # print("removed: Not found in %s" % (p.t1))
                continue
            vprint("Removed package: %s %s" % (p.t1["name"], p.t1["version"]))
            removed_set.append(p.t1)
    if "values_changed" in diff.keys():
        # print("Some values changed")
        for p in diff["values_changed"]:
            # Filter out name changes, this is addition/removal - removal of t1
            if re.search("^root\[[0-9]+\]\['name'\]$", p.path()):
                vprint("Removed package: %s %s" % (p.up.t1["name"], p.up.t1["version"]))
                vprint("Added package: %s %s" % (p.up.t2["name"], p.up.t2["version"]))
                removed_set.append(p.up.t1)
                added_set.append(p.up.t2)
                # TODO: Handle the special case of linux
            else:
                # If a high level property root[XXX]['somename'] of the item that was changed
                if re.search("^root\[[0-9]+\]\['[a-z,A-Z,0-9]+'\]$", p.path()):
                    # If the parent has been removed, this item is to be ignored
                    # We take it into account only if paren't hasn't been removed
                    if not p.up.t1 in removed_set:
                        vprint("Found changed items for: %s" % (p.up.t1["name"]))
                # product table varies when the high level has changed
                # Format: [root][XX]['products'][XX]['product']
                elif re.search(
                    "^root\[[0-9]+\]\['products'\]\[[0-9]+\]\['product'\]$", p.path()
                ):
                    if p.up.up.up.t1 not in removed_set:
                        vprint(
                            "Change of product names for package: %s %s"
                            % (p.up.up.up.t1["name"], p.up.up.up.t2["name"])
                        )
                # TODO: product table varies when the high level has changed
                # TODO: issue table version when the high level has changed
                # else:
                #    print(p)
    vprint("")
    return (removed_set, added_set)


def calculate_cve_diff(diff):
    """
    Parse a difference in CVEs from the deep diff
    Arguments:
        diff: the deep diff, tree format
    Returns:
        removed_cves: removed CVEs
        added_cves: added CVEs
    """
    added_cves = []
    removed_cves = []
    vprint("CVE status:")
    # Look for added/removed CVEs
    if "values_changed" in diff.keys():
        # Differences like <root[42]['issue'][2562]['status'] t1:'Patched', t2:'Unpatched'>
        for p in diff["values_changed"]:
            if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]\['status'\]$", p.path()):
                if (p.t1 == "Patched" or p.t1 == "Ignored") and p.t2 == "Unpatched":
                    # New CVE
                    vprint(
                        "Unpatched CVE: %s (%s %s)"
                        % (
                            p.up.t2["id"],
                            p.up.up.up.t2["name"],
                            p.up.up.up.t2["version"],
                        )
                    )
                    added_cves.append(p.up.t2["id"])
                elif p.t1 == "Unpatched" and (p.t2 == "Patched" or p.t2 == "Ignored"):
                    # Fixed CVE
                    vprint(
                        "Fixed CVE: %s (%s %s)"
                        % (
                            p.up.t2["id"],
                            p.up.up.up.t2["name"],
                            p.up.up.up.t2["version"],
                        )
                    )
                    removed_cves.append(p.up.t2["id"])
                else:
                    vprint(
                        "Unknown status of CVE: %s (%s %s)"
                        % (
                            p.up.t2["id"],
                            p.up.up.up.t2["name"],
                            p.up.up.up.t2["version"],
                        )
                    )
            # Other changes in the issue table
            elif re.search("^root\[[0-9]+\]\['issue'\].*$", p.path()):
                # The whole issue element has changed
                if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]$", p.path()):
                    if p.up.up.t1["name"] == p.up.up.t2["name"]:
                        vprint(
                            "Changed CVE for package %s %s"
                            % (p.up.up.t2["name"], p.up.up.t2["version"])
                        )
                else:
                    # Some fields of the issue changed
                    if re.search(
                        "^root\[[0-9]+\]\['issue'\]\[[0-9]+\]\['[a-z,A-Z]+'\]$",
                        p.path(),
                    ):
                        # If package names are different, this is a spurious diff
                        if p.up.up.up.t1["name"] == p.up.up.up.t2["name"]:
                            vprint(
                                "Changed CVE for package %s %s"
                                % (p.up.up.up.t2["name"], p.up.up.up.t2["version"])
                            )
    vprint("")
    return (removed_cves, added_cves)


def main(argv):
    file1, file2 = parse_args(argv)

    data1 = load_cve_json(file1)
    data2 = load_cve_json(file2)
    diff = calculate_diff(data1, data2)

    removed_set, added_set = calculate_package_diff(diff)

    removed_cves, added_cves = calculate_cve_diff(diff)

    print("Summary:")
    print(
        "Package report: the new version adds %s packages and removes %d packages"
        % (len(added_set), len(removed_set))
    )
    print(
        "CVE report: the new version removes %s CVEs and adds %s CVEs"
        % (len(removed_cves), len(added_cves))
    )


if __name__ == "__main__":
    main(sys.argv[1:])