Skip to content
Snippets Groups Projects
Commit 684973b7 authored by Marta Rybczynska's avatar Marta Rybczynska Committed by Pavel Zhukov
Browse files

scripts/cve-diff.py: add the script


cve-diff analyses JSON CVE outputs of two builds and shows
the difference between the two. The initial version handles:
added/removed packages and CVEs.

Signed-off-by: default avatarMarta Rybczynska <marta.rybczynska@huawei.com>
parent 6035dc2f
No related branches found
No related tags found
1 merge request!163scripts/cve-diff.py: add the script
# SPDX-FileCopyrightText: Huawei Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# Create a diff of two JSON CVE statuses
import sys
import getopt
import re
from deepdiff import DeepDiff
from pprint import pprint
verbose_level = 0
def show_syntax_and_exit(code):
"""
Show the program syntax and exit with an errror
Arguments:
code: the error code to return
"""
print("Syntax: %s [-h] [-v] file1 file2" % __name__)
sys.exit(code)
def exit_error(code, message):
"""
Show the error message and exit with an errror
Arguments:
code: the error code to return
message: the message to show
"""
print("Error: %s" % message)
sys.exit(code)
def vprint(*args, **kwargs):
"""
Printing with verbosity levels
Arguments:
args: as for print
kwargs: as for print
"""
global verbose_level
if verbose_level:
print(*args, **kwargs)
def parse_args(argv):
"""
Parse the program arguments, put options in global variables
Arguments:
argv: program arguments
Returns:
Two mandatory file arguments
"""
global verbose_level
try:
opts, args = getopt.getopt(argv, "hv", ["help", "verbose"])
except getopt.GetoptError:
show_syntax_and_exit(1)
for opt, arg in opts:
if opt in ("-h", "--help"):
show_syntax_and_exit(0)
elif opt in ("-v", "--verbose"):
verbose_level = 1
else:
show_syntax_and_exit(1)
if len(args) < 2:
exit_error(1, "Need at least two files to compare")
return (args[0], args[1])
def validate_cve_json(data):
"""
Check correctness of the loaded JSON data
Arguments:
data: loaded data
Returns:
Bool: True if file is correct, False otherwise
Error message: if the file has errors
"""
if not "version" in data or data["version"] != "1":
return False, "Unrecognized format version number"
if not "package" in data:
return False, "Mandatory 'package' key not found"
for package in data["package"]:
keys_in_package = {"name", "layer", "version", "issue"}
if keys_in_package - package.keys():
return False, "Missing a mandatory key in package: %s" % (
keys_in_package - package.keys()
)
for issue in package["issue"]:
keys_in_issue = {"id", "scorev2", "scorev3", "vector", "status"}
if keys_in_issue - issue.keys():
return (
False,
"Missing mandatory keys %s in 'issue' for the package '%s'"
% (keys_in_issue - issue.keys(), package_name),
)
return True, ""
def get_name(value):
"""
Function used for sorting, return the sorting key to use
Argument:
value: raw value
Return:
Package name to use for sorting
"""
return value["name"]
def load_cve_json(filename):
"""
Load the JSON file, return the resulting dictionary
Arguments:
filename: the file to open
Returns:
Parsed file as a dictionary
"""
import json
out = {}
try:
with open(filename, "r") as f:
out = json.load(f)
except FileNotFoundError:
exit_error(1, "Input file (%s) not found" % (filename))
except json.decoder.JSONDecodeError as error:
exit_error(1, "Malformed JSON file: %s" % str(error))
# Validate file
validated, error = validate_cve_json(out)
if not validated:
exit_error(1, error)
out_sorted = sorted(out["package"], key=get_name)
return out_sorted
def calculate_diff(data1, data2):
"""
Calculate a deep diff between two JSON data sets
Arguments:
data1: source data to compare (current)
data2: destination data to compare (upstream)
Returns:
Diff in the DeepDiff tree format
"""
ddiff = DeepDiff(data1, data2, ignore_order=True, view="tree")
return ddiff
def calculate_package_diff(diff):
"""
Parse a difference in packages from the deep diff
Arguments:
diff: the deep diff, tree format
Returns:
removed set: added packages list
added_set: removed packages list
"""
removed_set = []
added_set = []
vprint("Package status:")
if "iterable_item_added" in diff.keys():
for p in diff["iterable_item_added"]:
# Only p.t2 exists
# Assure only "root[XXXX]" items
if not re.search("^root\[[0-9]+\]$", p.path()):
# If we have a new issue
# if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]$", p):
# print("New issue")
continue
vprint("Added package: %s %s" % (p.t2["name"], p.t2["version"]))
added_set.append(p.t2)
if "iterable_item_removed" in diff.keys():
for p in diff["iterable_item_removed"]:
# Only p.t1 exists
# Assure only "root[XXXX]" items
if not re.search("^root\[[0-9]+\]$", p.path()):
# print("removed: Not found in %s" % (p.t1))
continue
vprint("Removed package: %s %s" % (p.t1["name"], p.t1["version"]))
removed_set.append(p.t1)
if "values_changed" in diff.keys():
# print("Some values changed")
for p in diff["values_changed"]:
# Filter out name changes, this is addition/removal - removal of t1
if re.search("^root\[[0-9]+\]\['name'\]$", p.path()):
vprint("Removed package: %s %s" % (p.up.t1["name"], p.up.t1["version"]))
vprint("Added package: %s %s" % (p.up.t2["name"], p.up.t2["version"]))
removed_set.append(p.up.t1)
added_set.append(p.up.t2)
# TODO: Handle the special case of linux
else:
# If a high level property root[XXX]['somename'] of the item that was changed
if re.search("^root\[[0-9]+\]\['[a-z,A-Z,0-9]+'\]$", p.path()):
# If the parent has been removed, this item is to be ignored
# We take it into account only if paren't hasn't been removed
if not p.up.t1 in removed_set:
vprint("Found changed items for: %s" % (p.up.t1["name"]))
# product table varies when the high level has changed
# Format: [root][XX]['products'][XX]['product']
elif re.search(
"^root\[[0-9]+\]\['products'\]\[[0-9]+\]\['product'\]$", p.path()
):
if p.up.up.up.t1 not in removed_set:
vprint(
"Change of product names for package: %s %s"
% (p.up.up.up.t1["name"], p.up.up.up.t2["name"])
)
# TODO: product table varies when the high level has changed
# TODO: issue table version when the high level has changed
# else:
# print(p)
vprint("")
return (removed_set, added_set)
def calculate_cve_diff(diff):
"""
Parse a difference in CVEs from the deep diff
Arguments:
diff: the deep diff, tree format
Returns:
removed_cves: removed CVEs
added_cves: added CVEs
"""
added_cves = []
removed_cves = []
vprint("CVE status:")
# Look for added/removed CVEs
if "values_changed" in diff.keys():
# Differences like <root[42]['issue'][2562]['status'] t1:'Patched', t2:'Unpatched'>
for p in diff["values_changed"]:
if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]\['status'\]$", p.path()):
if (p.t1 == "Patched" or p.t1 == "Ignored") and p.t2 == "Unpatched":
# New CVE
vprint(
"Unpatched CVE: %s (%s %s)"
% (
p.up.t2["id"],
p.up.up.up.t2["name"],
p.up.up.up.t2["version"],
)
)
added_cves.append(p.up.t2["id"])
elif p.t1 == "Unpatched" and (p.t2 == "Patched" or p.t2 == "Ignored"):
# Fixed CVE
vprint(
"Fixed CVE: %s (%s %s)"
% (
p.up.t2["id"],
p.up.up.up.t2["name"],
p.up.up.up.t2["version"],
)
)
removed_cves.append(p.up.t2["id"])
else:
vprint(
"Unknown status of CVE: %s (%s %s)"
% (
p.up.t2["id"],
p.up.up.up.t2["name"],
p.up.up.up.t2["version"],
)
)
# Other changes in the issue table
elif re.search("^root\[[0-9]+\]\['issue'\].*$", p.path()):
# The whole issue element has changed
if re.search("^root\[[0-9]+\]\['issue'\]\[[0-9]+\]$", p.path()):
if p.up.up.t1["name"] == p.up.up.t2["name"]:
vprint(
"Changed CVE for package %s %s"
% (p.up.up.t2["name"], p.up.up.t2["version"])
)
else:
# Some fields of the issue changed
if re.search(
"^root\[[0-9]+\]\['issue'\]\[[0-9]+\]\['[a-z,A-Z]+'\]$",
p.path(),
):
# If package names are different, this is a spurious diff
if p.up.up.up.t1["name"] == p.up.up.up.t2["name"]:
vprint(
"Changed CVE for package %s %s"
% (p.up.up.up.t2["name"], p.up.up.up.t2["version"])
)
vprint("")
return (removed_cves, added_cves)
def main(argv):
file1, file2 = parse_args(argv)
data1 = load_cve_json(file1)
data2 = load_cve_json(file2)
diff = calculate_diff(data1, data2)
removed_set, added_set = calculate_package_diff(diff)
removed_cves, added_cves = calculate_cve_diff(diff)
print("Summary:")
print(
"Package report: the new version adds %s packages and removes %d packages"
% (len(added_set), len(removed_set))
)
print(
"CVE report: the new version removes %s CVEs and adds %s CVEs"
% (len(removed_cves), len(added_cves))
)
if __name__ == "__main__":
main(sys.argv[1:])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment