Skip to content
Snippets Groups Projects
Commit 19931c39 authored by Marta Rybczynska's avatar Marta Rybczynska
Browse files

cve-check: add YAML output

This patch implements the YAML output for CVE-check and re-implements
the coverage pass using this format (also simplifies the logic).

Add an option to output the CVE check in a JSON-based format.
This format is easier to parse in software than the original
text-based one and allows post-processing by other tools.

Output formats are now handed by CVE_CHECK_FORMAT_TEXT and
CVE_CHECK_FORMAT_JSON. Both of them are enabled by default.

The JSON output format gets generated in a similar way to the
text format with the exception of the manifest: appending to
JSON arrays requires parsing the file. Because of that we
first write JSON fragments and then assemble them in one pass
at the end.

Until now the CVE checker was giving information about CVEs found for
a product (or more products) contained in a recipe. However, there was
no easy way to find out which products or recipes have no CVEs. Having
no reported CVEs might mean there are simply none, but can also mean
a product name (CPE) mismatch.

This patch adds CVE_CHECK_COVERAGE option enabling a new type of
statistics. Then we use the new JSON format to report the information.
The legacy text mode report does not contain it.

This option is expected to help with an identification of recipes with
mismatched CPEs, issues in the database and more.

This work is based on [1], but adding the JSON format makes it easier
to implement, without additional result files.

[1] https://lists.openembedded.org/g/openembedded-core/message/159873

This is a backported version of two patches submitted upstream:
https://lists.openembedded.org/g/openembedded-core/message/163745 and
https://lists.openembedded.org/g/openembedded-core/message/163746



The difference is that we bring back the yaml merge function that is
in the lib/ directory in the upstream proposal.

Signed-off-by: default avatarMarta Rybczynska <marta.rybczynska@huawei.com>
parent 35e75415
No related branches found
No related tags found
No related merge requests found
...@@ -34,15 +34,30 @@ CVE_CHECK_TMP_FILE ?= "${TMPDIR}/cve_check" ...@@ -34,15 +34,30 @@ CVE_CHECK_TMP_FILE ?= "${TMPDIR}/cve_check"
CVE_CHECK_SUMMARY_DIR ?= "${LOG_DIR}/cve" CVE_CHECK_SUMMARY_DIR ?= "${LOG_DIR}/cve"
CVE_CHECK_SUMMARY_FILE_NAME ?= "cve-summary" CVE_CHECK_SUMMARY_FILE_NAME ?= "cve-summary"
CVE_CHECK_SUMMARY_FILE ?= "${CVE_CHECK_SUMMARY_DIR}/${CVE_CHECK_SUMMARY_FILE_NAME}" CVE_CHECK_SUMMARY_FILE ?= "${CVE_CHECK_SUMMARY_DIR}/${CVE_CHECK_SUMMARY_FILE_NAME}"
CVE_CHECK_SUMMARY_FILE_NAME_JSON = "cve-summary.json"
CVE_CHECK_SUMMARY_INDEX_PATH = "${CVE_CHECK_SUMMARY_DIR}/cve-summary-index.txt"
CVE_CHECK_LOG_JSON ?= "${T}/cve.json"
CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve" CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve"
CVE_CHECK_RECIPE_FILE ?= "${CVE_CHECK_DIR}/${PN}" CVE_CHECK_RECIPE_FILE ?= "${CVE_CHECK_DIR}/${PN}"
CVE_CHECK_RECIPE_FILE_JSON ?= "${CVE_CHECK_DIR}/${PN}_cve.json"
CVE_CHECK_MANIFEST ?= "${DEPLOY_DIR_IMAGE}/${IMAGE_NAME}${IMAGE_NAME_SUFFIX}.cve" CVE_CHECK_MANIFEST ?= "${DEPLOY_DIR_IMAGE}/${IMAGE_NAME}${IMAGE_NAME_SUFFIX}.cve"
CVE_CHECK_MANIFEST_JSON ?= "${DEPLOY_DIR_IMAGE}/${IMAGE_NAME}${IMAGE_NAME_SUFFIX}.json"
CVE_CHECK_COPY_FILES ??= "1" CVE_CHECK_COPY_FILES ??= "1"
CVE_CHECK_CREATE_MANIFEST ??= "1" CVE_CHECK_CREATE_MANIFEST ??= "1"
CVE_CHECK_REPORT_PATCHED ??= "1" CVE_CHECK_REPORT_PATCHED ??= "1"
# Provide text output
CVE_CHECK_FORMAT_TEXT ??= "1"
# Provide JSON output
CVE_CHECK_FORMAT_JSON ??= "1"
# Check for packages without CVEs (no issues or missing product name)
CVE_CHECK_COVERAGE ??= "1"
# Skip CVE Check for packages (PN) # Skip CVE Check for packages (PN)
CVE_CHECK_SKIP_RECIPE ?= "" CVE_CHECK_SKIP_RECIPE ?= ""
...@@ -102,10 +117,10 @@ python do_cve_check () { ...@@ -102,10 +117,10 @@ python do_cve_check () {
patched_cves = get_patched_cves(d) patched_cves = get_patched_cves(d)
except FileNotFoundError: except FileNotFoundError:
bb.fatal("Failure in searching patches") bb.fatal("Failure in searching patches")
ignored, patched, unpatched = check_cves(d, patched_cves) ignored, patched, unpatched, status = check_cves(d, patched_cves)
if patched or unpatched: if patched or unpatched or (d.getVar("CVE_CHECK_COVERAGE") == "1" and status):
cve_data = get_cve_info(d, patched + unpatched) cve_data = get_cve_info(d, patched + unpatched)
cve_write_data(d, patched, unpatched, ignored, cve_data) cve_write_data(d, patched, unpatched, ignored, cve_data, status)
else: else:
bb.note("No CVE database found, skipping CVE check") bb.note("No CVE database found, skipping CVE check")
...@@ -120,11 +135,29 @@ python cve_check_cleanup () { ...@@ -120,11 +135,29 @@ python cve_check_cleanup () {
Delete the file used to gather all the CVE information. Delete the file used to gather all the CVE information.
""" """
bb.utils.remove(e.data.getVar("CVE_CHECK_TMP_FILE")) bb.utils.remove(e.data.getVar("CVE_CHECK_TMP_FILE"))
bb.utils.remove(e.data.getVar("CVE_CHECK_SUMMARY_INDEX_PATH"))
} }
addhandler cve_check_cleanup addhandler cve_check_cleanup
cve_check_cleanup[eventmask] = "bb.cooker.CookerExit" cve_check_cleanup[eventmask] = "bb.cooker.CookerExit"
def cve_check_merge_jsons(output, data):
"""
Merge the data in the "package" property to the main data file
output
"""
if output["version"] != data["version"]:
bb.error("Version mismatch when merging JSON outputs")
return
for product in output["package"]:
if product["name"] == data["package"][0]["name"]:
bb.error("Error adding the same package twice")
return
output["package"].append(data["package"][0])
python cve_check_write_rootfs_manifest () { python cve_check_write_rootfs_manifest () {
""" """
Create CVE manifest when building an image Create CVE manifest when building an image
...@@ -136,6 +169,9 @@ python cve_check_write_rootfs_manifest () { ...@@ -136,6 +169,9 @@ python cve_check_write_rootfs_manifest () {
deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE") deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE")
if os.path.exists(deploy_file): if os.path.exists(deploy_file):
bb.utils.remove(deploy_file) bb.utils.remove(deploy_file)
deploy_file_json = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
if os.path.exists(deploy_file_json):
bb.utils.remove(deploy_file_json)
if os.path.exists(d.getVar("CVE_CHECK_TMP_FILE")): if os.path.exists(d.getVar("CVE_CHECK_TMP_FILE")):
bb.note("Writing rootfs CVE manifest") bb.note("Writing rootfs CVE manifest")
...@@ -154,6 +190,26 @@ python cve_check_write_rootfs_manifest () { ...@@ -154,6 +190,26 @@ python cve_check_write_rootfs_manifest () {
os.remove(manifest_link) os.remove(manifest_link)
os.symlink(os.path.basename(manifest_name), manifest_link) os.symlink(os.path.basename(manifest_name), manifest_link)
bb.plain("Image CVE report stored in: %s" % manifest_name) bb.plain("Image CVE report stored in: %s" % manifest_name)
if os.path.exists(d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")):
import json
bb.note("Generating JSON CVE manifest")
deploy_dir = d.getVar("DEPLOY_DIR_IMAGE")
link_name = d.getVar("IMAGE_LINK_NAME")
manifest_name = d.getVar("CVE_CHECK_MANIFEST_JSON")
index_file = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")
manifest = {"version":"1", "package": []}
with open(index_file) as f:
filename = f.readline()
while filename:
with open(filename.rstrip()) as j:
data = json.load(j)
cve_check_merge_jsons(manifest, data)
filename = f.readline()
with open(manifest_name, "w") as f:
json.dump(manifest, f, indent=2)
bb.plain("Image CVE report stored in: %s" % manifest_name)
} }
ROOTFS_POSTPROCESS_COMMAND:prepend = "${@'cve_check_write_rootfs_manifest; ' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" ROOTFS_POSTPROCESS_COMMAND:prepend = "${@'cve_check_write_rootfs_manifest; ' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
...@@ -170,17 +226,19 @@ def check_cves(d, patched_cves): ...@@ -170,17 +226,19 @@ def check_cves(d, patched_cves):
suffix = d.getVar("CVE_VERSION_SUFFIX") suffix = d.getVar("CVE_VERSION_SUFFIX")
cves_unpatched = [] cves_unpatched = []
cves_status = []
cves_in_recipe = False
# CVE_PRODUCT can contain more than one product (eg. curl/libcurl) # CVE_PRODUCT can contain more than one product (eg. curl/libcurl)
products = d.getVar("CVE_PRODUCT").split() products = d.getVar("CVE_PRODUCT").split()
# If this has been unset then we're not scanning for CVEs here (for example, image recipes) # If this has been unset then we're not scanning for CVEs here (for example, image recipes)
if not products: if not products:
return ([], [], []) return ([], [], [], {})
pv = d.getVar("CVE_VERSION").split("+git")[0] pv = d.getVar("CVE_VERSION").split("+git")[0]
# If the recipe has been skipped/ignored we return empty lists # If the recipe has been skipped/ignored we return empty lists
if pn in d.getVar("CVE_CHECK_SKIP_RECIPE").split(): if pn in d.getVar("CVE_CHECK_SKIP_RECIPE").split():
bb.note("Recipe has been skipped by cve-check") bb.note("Recipe has been skipped by cve-check")
return ([], [], []) return ([], [], [], [])
cve_ignore = d.getVar("CVE_CHECK_IGNORE").split() cve_ignore = d.getVar("CVE_CHECK_IGNORE").split()
...@@ -190,6 +248,7 @@ def check_cves(d, patched_cves): ...@@ -190,6 +248,7 @@ def check_cves(d, patched_cves):
# For each of the known product names (e.g. curl has CPEs using curl and libcurl)... # For each of the known product names (e.g. curl has CPEs using curl and libcurl)...
for product in products: for product in products:
cves_in_product = False
if ":" in product: if ":" in product:
vendor, product = product.split(":", 1) vendor, product = product.split(":", 1)
else: else:
...@@ -207,6 +266,11 @@ def check_cves(d, patched_cves): ...@@ -207,6 +266,11 @@ def check_cves(d, patched_cves):
elif cve in patched_cves: elif cve in patched_cves:
bb.note("%s has been patched" % (cve)) bb.note("%s has been patched" % (cve))
continue continue
# Write status once only for each product
if not cves_in_product:
cves_status.append([product, True])
cves_in_product = True
cves_in_recipe = True
vulnerable = False vulnerable = False
for row in conn.execute("SELECT * FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor)): for row in conn.execute("SELECT * FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor)):
...@@ -253,9 +317,16 @@ def check_cves(d, patched_cves): ...@@ -253,9 +317,16 @@ def check_cves(d, patched_cves):
# TODO: not patched but not vulnerable # TODO: not patched but not vulnerable
patched_cves.add(cve) patched_cves.add(cve)
if not cves_in_product:
bb.note("No CVE records found for product %s, pn %s" % (product, pn))
cves_status.append([product, False])
conn.close() conn.close()
return (list(cve_ignore), list(patched_cves), cves_unpatched) if not cves_in_recipe:
bb.note("No CVE records for products in recipe %s" % (pn))
return (list(cve_ignore), list(patched_cves), cves_unpatched, cves_status)
def get_cve_info(d, cves): def get_cve_info(d, cves):
""" """
...@@ -280,13 +351,12 @@ def get_cve_info(d, cves): ...@@ -280,13 +351,12 @@ def get_cve_info(d, cves):
conn.close() conn.close()
return cve_data return cve_data
def cve_write_data(d, patched, unpatched, ignored, cve_data): def cve_write_data_text(d, patched, unpatched, ignored, cve_data):
""" """
Write CVE information in WORKDIR; and to CVE_CHECK_DIR, and Write CVE information in WORKDIR; and to CVE_CHECK_DIR, and
CVE manifest if enabled. CVE manifest if enabled.
""" """
cve_file = d.getVar("CVE_CHECK_LOG") cve_file = d.getVar("CVE_CHECK_LOG")
fdir_name = d.getVar("FILE_DIRNAME") fdir_name = d.getVar("FILE_DIRNAME")
layer = fdir_name.split("/")[-3] layer = fdir_name.split("/")[-3]
...@@ -300,6 +370,10 @@ def cve_write_data(d, patched, unpatched, ignored, cve_data): ...@@ -300,6 +370,10 @@ def cve_write_data(d, patched, unpatched, ignored, cve_data):
if include_layers and layer not in include_layers: if include_layers and layer not in include_layers:
return return
# Early exit, the text format does not report packages without CVEs
if not patched+unpatched:
return
nvd_link = "https://nvd.nist.gov/vuln/detail/" nvd_link = "https://nvd.nist.gov/vuln/detail/"
write_string = "" write_string = ""
unpatched_cves = [] unpatched_cves = []
...@@ -346,3 +420,116 @@ def cve_write_data(d, patched, unpatched, ignored, cve_data): ...@@ -346,3 +420,116 @@ def cve_write_data(d, patched, unpatched, ignored, cve_data):
with open(d.getVar("CVE_CHECK_TMP_FILE"), "a") as f: with open(d.getVar("CVE_CHECK_TMP_FILE"), "a") as f:
f.write("%s" % write_string) f.write("%s" % write_string)
def cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file):
"""
Write CVE information in the JSON format: to WORKDIR; and to
CVE_CHECK_DIR, if CVE manifest if enabled, write fragment
files that will be assembled at the end in cve_check_write_rootfs_manifest.
"""
import json
write_string = json.dumps(output, indent=2)
with open(direct_file, "w") as f:
bb.note("Writing file %s with CVE information" % direct_file)
f.write(write_string)
if d.getVar("CVE_CHECK_COPY_FILES") == "1":
bb.utils.mkdirhier(os.path.dirname(deploy_file))
with open(deploy_file, "w") as f:
f.write(write_string)
if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1":
cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR")
index_path = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")
bb.utils.mkdirhier(cvelogpath)
fragment_file = os.path.basename(deploy_file)
fragment_path = os.path.join(cvelogpath, fragment_file)
with open(fragment_path, "w") as f:
f.write(write_string)
with open(index_path, "a+") as f:
f.write("%s\n" % fragment_path)
def cve_write_data_json(d, patched, unpatched, ignored, cve_data, cve_status):
"""
Prepare CVE data for the JSON format, then write it.
"""
output = {"version":"1", "package": []}
nvd_link = "https://nvd.nist.gov/vuln/detail/"
fdir_name = d.getVar("FILE_DIRNAME")
layer = fdir_name.split("/")[-3]
include_layers = d.getVar("CVE_CHECK_LAYER_INCLUDELIST").split()
exclude_layers = d.getVar("CVE_CHECK_LAYER_EXCLUDELIST").split()
if exclude_layers and layer in exclude_layers:
return
if include_layers and layer not in include_layers:
return
unpatched_cves = []
product_data = []
for s in cve_status:
p = {"product": s[0], "cvesInRecord": "Yes"}
if s[1] == False:
p["cvesInRecord"] = "No"
product_data.append(p)
package_version = "%s%s" % (d.getVar("EXTENDPE"), d.getVar("PV"))
package_data = {
"name" : d.getVar("PN"),
"layer" : layer,
"version" : package_version,
"products": product_data
}
cve_list = []
for cve in sorted(cve_data):
is_patched = cve in patched
status = "Unpatched"
if is_patched and (d.getVar("CVE_CHECK_REPORT_PATCHED") != "1"):
continue
if cve in ignored:
status = "Ignored"
elif is_patched:
status = "Patched"
else:
# default value of status is Unpatched
unpatched_cves.append(cve)
issue_link = "%s%s" % (nvd_link, cve)
cve_item = {
"id" : cve,
"summary" : cve_data[cve]["summary"],
"scorev2" : cve_data[cve]["scorev2"],
"scorev3" : cve_data[cve]["scorev3"],
"vector" : cve_data[cve]["vector"],
"status" : status,
"link": issue_link
}
cve_list.append(cve_item)
package_data["issue"] = cve_list
output["package"].append(package_data)
direct_file = d.getVar("CVE_CHECK_LOG_JSON")
deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE_JSON")
manifest_file = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON")
cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file)
def cve_write_data(d, patched, unpatched, ignored, cve_data, status):
"""
Write CVE data in each enabled format.
"""
if d.getVar("CVE_CHECK_FORMAT_TEXT") == "1":
cve_write_data_text(d, patched, unpatched, ignored, cve_data)
if d.getVar("CVE_CHECK_FORMAT_JSON") == "1":
cve_write_data_json(d, patched, unpatched, ignored, cve_data, status)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment