Hi Marta,

Thanks for the great work on this topic.
I have left 3 comments below.

Thanks for considering them.
  Peter

> -----Original Message-----
> From: [email protected] <openembedded-
> [email protected]> On Behalf Of Marta Rybczynska via
> lists.openembedded.org
> Sent: Monday, July 15, 2024 12:20
> To: [email protected]
> Cc: Marta Rybczynska <[email protected]>; Samantha Jalabert
> <[email protected]>
> Subject: [OE-core] [PATCH 1/3] cve-check: enrich annotation of CVEs
> 
> Previously the information passed between different function of the
> cve-check class included only tables of patched, unpatched, ignored
> vulnerabilities and the general status of the recipe.
> 
> The VEX work requires more information, and we need to pass them
> between different functions, so that it can be enriched as the
> analysis progresses. Instead of multiple tables, use a single one
> with annotations for each CVE encountered. For example, a patched
> CVE will have:
> 
> {"abbrev-status": "Patched", "status": "version-not-in-range"}
> 
> abbrev-status contains the general status (Patched, Unpatched,
> Ignored and Unknown that will be added in the VEX code)
> status contains more detailed information that can come from
> CVE_STATUS and the analysis.
> 
> Additional fields of the annotation include for example the name
> of the patch file fixing a given CVE.
> 
> Signed-off-by: Marta Rybczynska <[email protected]>
> Signed-off-by: Samantha Jalabert <[email protected]>
> ---
>  meta/classes/cve-check.bbclass | 208 +++++++++++++++++----------------
>  meta/lib/oe/cve_check.py       |  12 +-
>  2 files changed, 113 insertions(+), 107 deletions(-)
> 
> diff --git a/meta/classes/cve-check.bbclass b/meta/classes/cve-check.bbclass
> index 93a2a1413d..f177223568 100644
> --- a/meta/classes/cve-check.bbclass
> +++ b/meta/classes/cve-check.bbclass
> @@ -188,10 +188,10 @@ python do_cve_check () {
>                  patched_cves = get_patched_cves(d)
>              except FileNotFoundError:
>                  bb.fatal("Failure in searching patches")
> -            ignored, patched, unpatched, status = check_cves(d, patched_cves)
> -            if patched or unpatched or (d.getVar("CVE_CHECK_COVERAGE") == "1"
> and status):
> -                cve_data = get_cve_info(d, patched + unpatched + ignored)
> -                cve_write_data(d, patched, unpatched, ignored, cve_data, 
> status)
> +            cve_data, status = check_cves(d, patched_cves)
> +            if len(cve_data) or (d.getVar("CVE_CHECK_COVERAGE") == "1" and
> status):
> +                get_cve_info(d, cve_data)
> +                cve_write_data(d, cve_data, status)
>          else:
>              bb.note("No CVE database found, skipping CVE check")
> 
> @@ -294,7 +294,51 @@ ROOTFS_POSTPROCESS_COMMAND:prepend =
> "${@'cve_check_write_rootfs_manifest ' if d
>  do_rootfs[recrdeptask] += "${@'do_cve_check' if
> d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
>  do_populate_sdk[recrdeptask] += "${@'do_cve_check' if
> d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
> 
> -def check_cves(d, patched_cves):
> +def cve_is_ignored(d, cve_data, cve):
> +    if cve not in cve_data:
> +        return False
> +    if cve_data[cve]['abbrev-status'] == "Ignored":
> +        return True
> +    return False
> +
> +def cve_is_patched(d, cve_data, cve):
> +    if cve not in cve_data:
> +        return False
> +    if cve_data[cve]['abbrev-status'] == "Patched":
> +        return True
> +    return False
> +
> +def cve_update(d, cve_data, cve, entry):

I think that there is a fundamental change in behavior here.
Previously we were taking (NVD) DB as base and only vulnerable CVEs were 
compared annotated with CVE_STATUS or our presence of CVE patches.
Now we take the CVE_STATUS and CVE patches as base and add entries from DB only 
if they were not annotated yet.

I am not arguing against it, I actually like it much more as we will be able to 
insert also CVEs not in DB into our reports.
But I have two comments on this:
* this should be explicitly described in commit message
* this makes global cve includes spill into all recipe reports, so this commit 
series should also get rid of cve-extra-exclusions.inc file finally (or at 
least add a comment into it with this sideeffect)

> +    # If no entry, just add it
> +    if cve not in cve_data:
> +        cve_data[cve] = entry
> +        return
> +    # If we are updating, there might be change in the status
> +    bb.debug("Trying CVE entry update for %s from %s to %s" % (cve,
> cve_data[cve]['abbrev-status'], entry['abbrev-status']))
> +    if cve_data[cve]['abbrev-status'] == "Unknown":
> +        cve_data[cve] = entry
> +        return
> +    if cve_data[cve]['abbrev-status'] == entry['abbrev-status']:
> +        return
> +    # Update like in {'abbrev-status': 'Patched', 'status': 
> 'version-not-in-range'}
> to {'abbrev-status': 'Unpatched', 'status': 'version-in-range'}
> +    if entry['abbrev-status'] == "Unpatched" and cve_data[cve]['abbrev-
> status'] == "Patched":
> +        if entry['status'] == "version-in-range" and cve_data[cve]['status'] 
> ==
> "version-not-in-range":
> +            # New result from the scan, vulnerable
> +            cve_data[cve] = entry
> +            bb.debug("CVE entry %s update from Patched to Unpatched from the
> scan result" % cve)
> +            return
> +    if entry['abbrev-status'] == "Patched" and cve_data[cve]['abbrev-status']
> == "Unpatched":
> +        if entry['status'] == "version-not-in-range" and 
> cve_data[cve]['status'] ==
> "version-in-range":
> +            # Range does not match the scan, but we already have a vulnerable
> match, ignore
> +            bb.debug("CVE entry %s update from Patched to Unpatched from the
> scan result - not applying" % cve)
> +            return
> +    # If we have an "Ignored", it has a priority
> +    if cve_data[cve]['abbrev-status'] == "Ignored":
> +        bb.debug("CVE %s not updating because Ignored" % cve)
> +        return
> +    bb.warn("Unhandled CVE entry update for %s from %s to %s" % (cve,
> cve_data[cve], entry))
> +
> +def check_cves(d, cve_data):
>      """
>      Connect to the NVD database and find unpatched cves.
>      """
> @@ -304,28 +348,19 @@ def check_cves(d, patched_cves):
>      real_pv = d.getVar("PV")
>      suffix = d.getVar("CVE_VERSION_SUFFIX")
> 
> -    cves_unpatched = []
> -    cves_ignored = []
>      cves_status = []
>      cves_in_recipe = False
>      # CVE_PRODUCT can contain more than one product (eg. curl/libcurl)
>      products = d.getVar("CVE_PRODUCT").split()
>      # If this has been unset then we're not scanning for CVEs here (for 
> example,
> image recipes)
>      if not products:
> -        return ([], [], [], [])
> +        return ([], [])
>      pv = d.getVar("CVE_VERSION").split("+git")[0]
> 
>      # If the recipe has been skipped/ignored we return empty lists
>      if pn in d.getVar("CVE_CHECK_SKIP_RECIPE").split():
>          bb.note("Recipe has been skipped by cve-check")
> -        return ([], [], [], [])
> -
> -    # Convert CVE_STATUS into ignored CVEs and check validity
> -    cve_ignore = []
> -    for cve in (d.getVarFlags("CVE_STATUS") or {}):
> -        decoded_status, _, _ = decode_cve_status(d, cve)
> -        if decoded_status == "Ignored":
> -            cve_ignore.append(cve)
> +        return ([], [])
> 
>      import sqlite3
>      db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro")
> @@ -344,11 +379,10 @@ def check_cves(d, patched_cves):
>          for cverow in cve_cursor:
>              cve = cverow[0]
> 
> -            if cve in cve_ignore:
> +            if cve_is_ignored(d, cve_data, cve):
>                  bb.note("%s-%s ignores %s" % (product, pv, cve))
> -                cves_ignored.append(cve)
>                  continue
> -            elif cve in patched_cves:
> +            elif cve_is_patched(d, cve_data, cve):
>                  bb.note("%s has been patched" % (cve))
>                  continue
>              # Write status once only for each product
> @@ -364,7 +398,7 @@ def check_cves(d, patched_cves):
>              for row in product_cursor:
>                  (_, _, _, version_start, operator_start, version_end, 
> operator_end) =
> row
>                  #bb.debug(2, "Evaluating row " + str(row))
> -                if cve in cve_ignore:
> +                if cve_is_ignored(d, cve_data, cve):
>                      ignored = True
> 
>                  version_start = convert_cve_version(version_start)
> @@ -403,16 +437,16 @@ def check_cves(d, patched_cves):
>                  if vulnerable:
>                      if ignored:
>                          bb.note("%s is ignored in %s-%s" % (cve, pn, 
> real_pv))
> -                        cves_ignored.append(cve)
> +                        cve_update(d, cve_data, cve, {"abbrev-status": 
> "Ignored"})

I think that this case happens only when we ignore the CVE via CVE_STATUS.
So calling this is not necessary as it is already prefilled from for it, 
correct?
In case it would not be prefilled, then it would drop our enrichment...

>                      else:
>                          bb.note("%s-%s is vulnerable to %s" % (pn, real_pv, 
> cve))
> -                        cves_unpatched.append(cve)
> +                        cve_update(d, cve_data, cve, {"abbrev-status": 
> "Unpatched",
> "status": "version-in-range"})

Could you please include the new statuses in cve-check-map.conf
(e.g. version-in-range, version-not-in-range, fix-file-included)
I think it's important to have all possible statuses there to be able to 
correct the DB manually.

>                      break
>              product_cursor.close()
> 
>              if not vulnerable:
>                  bb.note("%s-%s is not vulnerable to %s" % (pn, real_pv, cve))
> -                patched_cves.add(cve)
> +                cve_update(d, cve_data, cve, {"abbrev-status": "Patched", 
> "status":
> "version-not-in-range"})
>          cve_cursor.close()
> 
>          if not cves_in_product:
> @@ -420,48 +454,45 @@ def check_cves(d, patched_cves):
>              cves_status.append([product, False])
> 
>      conn.close()
> -    diff_ignore = list(set(cve_ignore) - set(cves_ignored))
> -    if diff_ignore:
> -        oe.qa.handle_error("cve_status_not_in_db", "Found CVE (%s) with
> CVE_STATUS set that are not found in database for this component" % "
> ".join(diff_ignore), d)
> 
>      if not cves_in_recipe:
>          bb.note("No CVE records for products in recipe %s" % (pn))
> 
> -    return (list(cves_ignored), list(patched_cves), cves_unpatched,
> cves_status)
> +    return (cve_data, cves_status)
> 
> -def get_cve_info(d, cves):
> +def get_cve_info(d, cve_data):
>      """
>      Get CVE information from the database.
>      """
> 
>      import sqlite3
> 
> -    cve_data = {}
>      db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro")
>      conn = sqlite3.connect(db_file, uri=True)
> 
> -    for cve in cves:
> +    for cve in cve_data:
>          cursor = conn.execute("SELECT * FROM NVD WHERE ID IS ?", (cve,))
>          for row in cursor:
> -            cve_data[row[0]] = {}
> -            cve_data[row[0]]["summary"] = row[1]
> -            cve_data[row[0]]["scorev2"] = row[2]
> -            cve_data[row[0]]["scorev3"] = row[3]
> -            cve_data[row[0]]["modified"] = row[4]
> -            cve_data[row[0]]["vector"] = row[5]
> -            cve_data[row[0]]["vectorString"] = row[6]
> +            # The CVE itdelf has been added already
> +            if row[0] not in cve_data:
> +                bb.note("CVE record %s not present" % row[0])
> +                continue
> +            #cve_data[row[0]] = {}
> +            cve_data[row[0]]["NVD-summary"] = row[1]
> +            cve_data[row[0]]["NVD-scorev2"] = row[2]
> +            cve_data[row[0]]["NVD-scorev3"] = row[3]
> +            cve_data[row[0]]["NVD-modified"] = row[4]
> +            cve_data[row[0]]["NVD-vector"] = row[5]
> +            cve_data[row[0]]["NVD-vectorString"] = row[6]
>          cursor.close()
>      conn.close()
> -    return cve_data
> 
> -def cve_write_data_text(d, patched, unpatched, ignored, cve_data):
> +def cve_write_data_text(d, cve_data):
>      """
>      Write CVE information in WORKDIR; and to CVE_CHECK_DIR, and
>      CVE manifest if enabled.
>      """
> 
> -    from oe.cve_check import decode_cve_status
> -
>      cve_file = d.getVar("CVE_CHECK_LOG")
>      fdir_name  = d.getVar("FILE_DIRNAME")
>      layer = fdir_name.split("/")[-3]
> @@ -478,7 +509,7 @@ def cve_write_data_text(d, patched, unpatched,
> ignored, cve_data):
>          return
> 
>      # Early exit, the text format does not report packages without CVEs
> -    if not patched+unpatched+ignored:
> +    if not len(cve_data):
>          return
> 
>      nvd_link = "https://nvd.nist.gov/vuln/detail/";
> @@ -487,36 +518,27 @@ def cve_write_data_text(d, patched, unpatched,
> ignored, cve_data):
>      bb.utils.mkdirhier(os.path.dirname(cve_file))
> 
>      for cve in sorted(cve_data):
> -        is_patched = cve in patched
> -        is_ignored = cve in ignored
> -
> -        status = "Unpatched"
> -        if (is_patched or is_ignored) and not report_all:
> -            continue
> -        if is_ignored:
> -            status = "Ignored"
> -        elif is_patched:
> -            status = "Patched"
> -        else:
> -            # default value of status is Unpatched
> -            unpatched_cves.append(cve)
> -
>          write_string += "LAYER: %s\n" % layer
>          write_string += "PACKAGE NAME: %s\n" % d.getVar("PN")
>          write_string += "PACKAGE VERSION: %s%s\n" % (d.getVar("EXTENDPE"),
> d.getVar("PV"))
>          write_string += "CVE: %s\n" % cve
> -        write_string += "CVE STATUS: %s\n" % status
> -        _, detail, description = decode_cve_status(d, cve)
> -        if detail:
> -            write_string += "CVE DETAIL: %s\n" % detail
> -        if description:
> -            write_string += "CVE DESCRIPTION: %s\n" % description
> -        write_string += "CVE SUMMARY: %s\n" % cve_data[cve]["summary"]
> -        write_string += "CVSS v2 BASE SCORE: %s\n" % cve_data[cve]["scorev2"]
> -        write_string += "CVSS v3 BASE SCORE: %s\n" % cve_data[cve]["scorev3"]
> -        write_string += "VECTOR: %s\n" % cve_data[cve]["vector"]
> -        write_string += "VECTORSTRING: %s\n" % cve_data[cve]["vectorString"]
> +        write_string += "CVE STATUS: %s\n" % cve_data[cve]["abbrev-status"]
> +
> +        if 'status' in cve_data[cve]:
> +            write_string += "CVE DETAIL: %s\n" % cve_data[cve]["status"]
> +        if 'justification' in cve_data[cve]:
> +            write_string += "CVE DESCRIPTION: %s\n" %
> cve_data[cve]["justification"]
> +
> +        if "NVD-summary" in cve_data[cve]:
> +            write_string += "CVE SUMMARY: %s\n" % cve_data[cve]["NVD-
> summary"]
> +            write_string += "CVSS v2 BASE SCORE: %s\n" % cve_data[cve]["NVD-
> scorev2"]
> +            write_string += "CVSS v3 BASE SCORE: %s\n" % cve_data[cve]["NVD-
> scorev3"]
> +            write_string += "VECTOR: %s\n" % cve_data[cve]["NVD-vector"]
> +            write_string += "VECTORSTRING: %s\n" % cve_data[cve]["NVD-
> vectorString"]
> +
>          write_string += "MORE INFORMATION: %s%s\n\n" % (nvd_link, cve)
> +        if cve_data[cve]["abbrev-status"] == "Unpatched":
> +            unpatched_cves.append(cve)
> 
>      if unpatched_cves and d.getVar("CVE_CHECK_SHOW_WARNINGS") == "1":
>          bb.warn("Found unpatched CVE (%s), for more information check %s" %
> (" ".join(unpatched_cves),cve_file))
> @@ -568,13 +590,11 @@ def cve_check_write_json_output(d, output,
> direct_file, deploy_file, manifest_fi
>          with open(index_path, "a+") as f:
>              f.write("%s\n" % fragment_path)
> 
> -def cve_write_data_json(d, patched, unpatched, ignored, cve_data,
> cve_status):
> +def cve_write_data_json(d, cve_data, cve_status):
>      """
>      Prepare CVE data for the JSON format, then write it.
>      """
> 
> -    from oe.cve_check import decode_cve_status
> -
>      output = {"version":"1", "package": []}
>      nvd_link = "https://nvd.nist.gov/vuln/detail/";
> 
> @@ -592,8 +612,6 @@ def cve_write_data_json(d, patched, unpatched,
> ignored, cve_data, cve_status):
>      if include_layers and layer not in include_layers:
>          return
> 
> -    unpatched_cves = []
> -
>      product_data = []
>      for s in cve_status:
>          p = {"product": s[0], "cvesInRecord": "Yes"}
> @@ -608,39 +626,29 @@ def cve_write_data_json(d, patched, unpatched,
> ignored, cve_data, cve_status):
>          "version" : package_version,
>          "products": product_data
>      }
> +
>      cve_list = []
> 
>      for cve in sorted(cve_data):
> -        is_patched = cve in patched
> -        is_ignored = cve in ignored
> -        status = "Unpatched"
> -        if (is_patched or is_ignored) and not report_all:
> -            continue
> -        if is_ignored:
> -            status = "Ignored"
> -        elif is_patched:
> -            status = "Patched"
> -        else:
> -            # default value of status is Unpatched
> -            unpatched_cves.append(cve)
> -
>          issue_link = "%s%s" % (nvd_link, cve)
> 
>          cve_item = {
>              "id" : cve,
> -            "summary" : cve_data[cve]["summary"],
> -            "scorev2" : cve_data[cve]["scorev2"],
> -            "scorev3" : cve_data[cve]["scorev3"],
> -            "vector" : cve_data[cve]["vector"],
> -            "vectorString" : cve_data[cve]["vectorString"],
> -            "status" : status,
> -            "link": issue_link
> +            "status" : cve_data[cve]["abbrev-status"],
> +            "link": issue_link,
>          }
> -        _, detail, description = decode_cve_status(d, cve)
> -        if detail:
> -            cve_item["detail"] = detail
> -        if description:
> -            cve_item["description"] = description
> +        if 'NVD-summary' in cve_data[cve]:
> +            cve_item["summary"] = cve_data[cve]["NVD-summary"]
> +            cve_item["scorev2"] = cve_data[cve]["NVD-scorev2"]
> +            cve_item["scorev3"] = cve_data[cve]["NVD-scorev3"]
> +            cve_item["vector"] = cve_data[cve]["NVD-vector"]
> +            cve_item["vectorString"] = cve_data[cve]["NVD-vectorString"]
> +        if 'status' in cve_data[cve]:
> +            cve_item["detail"] = cve_data[cve]["status"]
> +        if 'justification' in cve_data[cve]:
> +            cve_item["description"] = cve_data[cve]["justification"]
> +        if 'resource' in cve_data[cve]:
> +            cve_item["patch-file"] = cve_data[cve]["resource"]
>          cve_list.append(cve_item)
> 
>      package_data["issue"] = cve_list
> @@ -652,12 +660,12 @@ def cve_write_data_json(d, patched, unpatched,
> ignored, cve_data, cve_status):
> 
>      cve_check_write_json_output(d, output, direct_file, deploy_file,
> manifest_file)
> 
> -def cve_write_data(d, patched, unpatched, ignored, cve_data, status):
> +def cve_write_data(d, cve_data, status):
>      """
>      Write CVE data in each enabled format.
>      """
> 
>      if d.getVar("CVE_CHECK_FORMAT_TEXT") == "1":
> -        cve_write_data_text(d, patched, unpatched, ignored, cve_data)
> +        cve_write_data_text(d, cve_data)
>      if d.getVar("CVE_CHECK_FORMAT_JSON") == "1":
> -        cve_write_data_json(d, patched, unpatched, ignored, cve_data, status)
> +        cve_write_data_json(d, cve_data, status)
> diff --git a/meta/lib/oe/cve_check.py b/meta/lib/oe/cve_check.py
> index ed5c714cb8..6aba90183a 100644
> --- a/meta/lib/oe/cve_check.py
> +++ b/meta/lib/oe/cve_check.py
> @@ -88,7 +88,7 @@ def get_patched_cves(d):
>      # (cve_match regular expression)
>      cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d+)", re.IGNORECASE)
> 
> -    patched_cves = set()
> +    patched_cves = {}
>      patches = oe.patch.src_patches(d)
>      bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
>      for url in patches:
> @@ -98,7 +98,7 @@ def get_patched_cves(d):
>          fname_match = cve_file_name_match.search(patch_file)
>          if fname_match:
>              cve = fname_match.group(1).upper()
> -            patched_cves.add(cve)
> +            patched_cves[cve] = {"abbrev-status": "Patched", "status": 
> "fix-file-
> included", "resource": patch_file}
>              bb.debug(2, "Found %s from patch file name %s" % (cve, 
> patch_file))
> 
>          # Remote patches won't be present and compressed patches won't be
> @@ -124,7 +124,7 @@ def get_patched_cves(d):
>              cves = patch_text[match.start()+5:match.end()]
>              for cve in cves.split():
>                  bb.debug(2, "Patch %s solves %s" % (patch_file, cve))
> -                patched_cves.add(cve)
> +                patched_cves[cve] = {"abbrev-status": "Patched", "status": 
> "fix-file-
> included", "resource": patch_file}
>                  text_match = True
> 
>          if not fname_match and not text_match:
> @@ -132,10 +132,8 @@ def get_patched_cves(d):
> 
>      # Search for additional patched CVEs
>      for cve in (d.getVarFlags("CVE_STATUS") or {}):
> -        decoded_status, _, _ = decode_cve_status(d, cve)
> -        if decoded_status == "Patched":
> -            bb.debug(2, "CVE %s is additionally patched" % cve)
> -            patched_cves.add(cve)
> +        decoded_status, detail, description = decode_cve_status(d, cve)
> +        patched_cves[cve] = {"abbrev-status": decoded_status, "status": 
> detail,
> "justification": description}
> 
>      return patched_cves
> 
> --
> 2.43.0

-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#202141): 
https://lists.openembedded.org/g/openembedded-core/message/202141
Mute This Topic: https://lists.openembedded.org/mt/107228576/21656
Group Owner: [email protected]
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[[email protected]]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to