[Do not run this on autobuilder, it changes the CVE database] This patch adds some LLM calls on top of the cve-update-db recipe, to derive missing CPE identifiers. It also extends the cve database with a column to indicate if a given CVE-product association was derived with or without LLM, and includes this information in the final cve json also.
Signed-off-by: Gyorgy Sarvari <[email protected]> --- meta/classes/cve-check.bbclass | 6 +- .../recipes-core/meta/cve-update-db-native.bb | 138 ++++++++++++++++-- 2 files changed, 131 insertions(+), 13 deletions(-) diff --git a/meta/classes/cve-check.bbclass b/meta/classes/cve-check.bbclass index c63ebd56e1..d9a3636cb4 100644 --- a/meta/classes/cve-check.bbclass +++ b/meta/classes/cve-check.bbclass @@ -360,7 +360,7 @@ def check_cves(d, cve_data): product_cursor = conn.execute("SELECT * FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor)) for row in product_cursor: - (_, _, _, version_start, operator_start, version_end, operator_end) = row + (_, _, _, version_start, operator_start, version_end, operator_end, llm_guess) = row #bb.debug(2, "Evaluating row " + str(row)) if cve_is_ignored(d, cve_data, cve): ignored = True @@ -440,7 +440,7 @@ def get_cve_info(d, cve_data): conn = sqlite3.connect(db_file, uri=True) for cve in cve_data: - cursor = conn.execute("SELECT * FROM NVD WHERE ID IS ?", (cve,)) + cursor = conn.execute("SELECT NVD.*, PRODUCTS.llm_guess FROM NVD, products WHERE NVD.ID IS ? and NVD.id=PRODUCTS.id", (cve,)) for row in cursor: # The CVE itdelf has been added already if row[0] not in cve_data: @@ -454,6 +454,7 @@ def get_cve_info(d, cve_data): cve_data[row[0]]["NVD-modified"] = row[5] cve_data[row[0]]["NVD-vector"] = row[6] cve_data[row[0]]["NVD-vectorString"] = row[7] + cve_data[row[0]]['PRODUCTS-llmGuess'] = row[8] cursor.close() conn.close() @@ -544,6 +545,7 @@ def cve_write_data_json(d, cve_data, cve_status): cve_item["modified"] = cve_data[cve]["NVD-modified"] cve_item["vector"] = cve_data[cve]["NVD-vector"] cve_item["vectorString"] = cve_data[cve]["NVD-vectorString"] + cve_item["llmGuess"] = cve_data[cve]["PRODUCTS-llmGuess"] if 'status' in cve_data[cve]: cve_item["detail"] = cve_data[cve]["status"] if 'justification' in cve_data[cve]: diff --git a/meta/recipes-core/meta/cve-update-db-native.bb b/meta/recipes-core/meta/cve-update-db-native.bb index 3a6dc95580..3e186ad960 100644 --- a/meta/recipes-core/meta/cve-update-db-native.bb +++ b/meta/recipes-core/meta/cve-update-db-native.bb @@ -26,6 +26,42 @@ CVE_CHECK_DB_DLDIR_FILE ?= "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}" CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock" CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp" +CVE_CHECK_USE_LLM ?= "0" +CVE_CHECK_LLM_ENDPOINT ?= "http://localhost:11434/api/generate" +CVE_CHECK_LLM_MODEL ?= "llama3.1:8b" + +CVE_CHECK_LLM_PROMPT = "Based on a given CVE (Common Vulnerability and Exposures) \ +description and related reference URLs try to determine the correct CPE (Common Platform Enumeration) \ +value that is associated with the CVE, and also the first and last version \ +numbers that is affected by the vulnerability. The CPE id should have the \ +following format: "cpe:2.3:a:VENDOR:PRODUCT:*:*:*:*:*:*:*:*". You must substitute only the VENDOR and \ +PRODUCT placeholders in the mentioned CPE template. Pay attention to the number of asterisks at the end. \ +In case a CPE ID can be derived, the template MUST NOT be changed otherwise beside the placeholders. \ +Primarily use the CVE description to derive the VENDOR and PRODUCT details, but in case it is insuffient, \ +use the given reference URLs. The domain name frequently contains the product name and the vendor. In case one or \ +more details cannot be derived from the description, return "N/A" for these details. \ +You must respond with ONLY valid JSON. Do not include any explanation, formatting nor text outside \ +the JSON structure. It is imperative not to change the next mentioned JSON schema in your response, \ +the keys must remain intact. In case you are not able to determine any details, just set it to "N/A" \ +in the response. Your response MUST be a single JSON object with the following format: \ +{'CPE_ID': 'string - the CPE identifier', 'FIRST_VERSION': 'string - first vulnerable version', 'LAST_VERSION': 'string - last vulnerable version'} \ +\n\ +CRITICAL: Do NOT change any of the keys in the above JSON schema, all keys MUST remain as \ +specified above. Especially CPE_ID key must be kept. \ +The CVE description is enclosed below the 'CVE descripton START' and \ +'CVE description END' lines. Everything between these lines are part of \ +the CVE description to process. The reference urls can be found between the 'CVE URLS START' \ +and 'CVE URLS END' lines. Each line contains one reference URL. \ +\n\n\ +CVE description START \ +\n%s\n\ +CVE description END \ +\n\ +CVE URLS START \ +\n%s\ +CVE_URLS END \ +" + python () { if not bb.data.inherits_class("cve-check", d): raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.") @@ -219,12 +255,12 @@ def initialize_db(conn): c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \ VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \ - VERSION_END TEXT, OPERATOR_END TEXT)") + VERSION_END TEXT, OPERATOR_END TEXT, LLM_GUESS INTEGER)") c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);") c.close() -def parse_node_and_insert(conn, node, cveId, is_nvd): +def parse_node_and_insert(conn, node, cveId, is_nvd, llm_guess=0): # Parse children node if needed for child in node.get('children', ()): parse_node_and_insert(conn, child, cveId, is_nvd) @@ -256,10 +292,10 @@ def parse_node_and_insert(conn, node, cveId, is_nvd): if version != '*' and version != '-': # Version is defined, this is a '=' match - yield [cveId, vendor, product, version + version_suffix, '=', '', ''] + yield [cveId, vendor, product, version + version_suffix, '=', '', '', llm_guess] elif version == '-': # no version information is available - yield [cveId, vendor, product, version, '', '', ''] + yield [cveId, vendor, product, version, '', '', '', llm_guess] else: # Parse start version, end version and operators op_start = '' @@ -284,13 +320,13 @@ def parse_node_and_insert(conn, node, cveId, is_nvd): v_end = cpe['versionEndExcluding'] if op_start or op_end or v_start or v_end: - yield [cveId, vendor, product, v_start, op_start, v_end, op_end] + yield [cveId, vendor, product, v_start, op_start, v_end, op_end, llm_guess] else: # This is no version information, expressed differently. # Save processing by representing as -. - yield [cveId, vendor, product, '-', '', '', ''] + yield [cveId, vendor, product, '-', '', '', '', llm_guess] - conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator(is_nvd)).close() + conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?, ?)", cpe_generator(is_nvd)).close() def update_db_nvdjson(conn, jsondata): import json @@ -338,17 +374,97 @@ def get_metric_entry(metric): return secondaries[0] return None -def update_db_fkie(conn, jsondata): +def guess_missing_configuration(elt, d): + import requests + import json + cve_description = elt['descriptions'][0]['value'] + cve_id = elt['id'] + bb.note('Using LLM to guess cpe for %s' % cve_id) + + # Skip CPE-less kernel CVEs + if cve_description.startswith("In the Linux kernel, the following vulnerability has been resolved"): + bb.note("Kernel CVE, skipping") + return False + + cve_ref_urls = "" + for ref in elt['references']: + cve_ref_urls += "\n" + ref['url'] + + llm_prompt = d.getVar('CVE_CHECK_LLM_PROMPT') % (cve_description, cve_ref_urls) + payload = {'model': d.getVar('CVE_CHECK_LLM_MODEL'), 'stream': False, 'prompt': llm_prompt, 'format': 'json'} + + try: + response = requests.post(d.getVar('CVE_CHECK_LLM_ENDPOINT'), json=payload) + response = response.json()['response'] + response = json.loads(response) + except Exception as e: + bb.warn('Could not connect to LLM. Check logs for more details') + bb.note('Exception during LLM request: %s' % e) + return False + + bb.note('Parsed LLM response: %s' % response) + + if 'CPE_ID' not in response or response['CPE_ID'].upper() == 'N/A': + bb.note('Unknown CPE or hallucinated JSON schema') + return False + + cpe = response['CPE_ID'].replace(' ', '').lower() + + # tolerate one missing asterisk + if not cpe.endswith(':*:*:*:*:*:*:*:*') and cpe.endswith(':*:*:*:*:*:*:*'): + cpe += ":*" + + if not cpe.startswith('cpe:2.3:a:') or not cpe.endswith(':*:*:*:*:*:*:*:*') or '\\' in cpe: + bb.note('Malformed CPE id') + return False + + if any('\ud800' <= c <= '\udfff' for c in cpe): + bb.note('Invalid character in CPE id') + return False + + elt['configurations'] = [{'nodes': []}] + elt['configurations'][0]['nodes'].append({'operator': 'OR', 'negate': False, 'cpeMatch': []}) + elt['configurations'][0]['nodes'][0]['cpeMatch'].append({'vulnerable': True, 'criteria': cpe}) + + if 'FIRST_VERSION' in response and response['FIRST_VERSION'].upper() != 'N/A': + first_version = response['FIRST_VERSION'] + elt['configurations'][0]['nodes'][0]['cpeMatch'][0]['versionStartIncluding'] = first_version + elif 'FIRST_VERSION' not in response: + bb.note('FIRST_VERSION key is missing from LLM response: %s' % cve_id) + + if 'LAST_VERSION' in response and response['LAST_VERSION'].upper() != 'N/A': + last_version = response['LAST_VERSION'] + elt['configurations'][0]['nodes'][0]['cpeMatch'][0]['versionEndIncluding'] = last_version + elif 'LAST_VERSION' not in response: + bb.note('LAST_VERSION key is missing from LLM response: %s' % cve_id) + + return True + +def update_db_fkie(conn, jsondata, d): import json root = json.loads(jsondata) + use_llm = d.getVar('CVE_CHECK_USE_LLM') + for elt in root['cve_items']: + llm_guess = 0; if 'vulnStatus' not in elt or elt['vulnStatus'] == 'Rejected': continue - if 'configurations' not in elt: + if 'configurations' not in elt and use_llm == '0': + continue + + # ignore old CVEs without CPE. This is not a scientific decision, simply + # llm calls are heavy (at the time of writing this comment) + if 'configurations' not in elt and not elt['id'].startswith('CVE-2025-'): continue + if 'configurations' not in elt: + llm_guess = 1 + guess_result = guess_missing_configuration(elt, d) + if not guess_result: + continue + accessVector = None vectorString = None cvssv2 = 0.0 @@ -403,11 +519,11 @@ def update_db_fkie(conn, jsondata): for config in elt['configurations']: # This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing for node in config.get("nodes") or []: - parse_node_and_insert(conn, node, cveId, False) + parse_node_and_insert(conn, node, cveId, False, llm_guess) def update_db(d, conn, jsondata): if (d.getVar("NVD_DB_VERSION") == "FKIE"): - return update_db_fkie(conn, jsondata) + return update_db_fkie(conn, jsondata, d) else: return update_db_nvdjson(conn, jsondata)
-=-=-=-=-=-=-=-=-=-=-=- Links: You receive all messages sent to this group. View/Reply Online (#227344): https://lists.openembedded.org/g/openembedded-core/message/227344 Mute This Topic: https://lists.openembedded.org/mt/116627448/21656 Group Owner: [email protected] Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [[email protected]] -=-=-=-=-=-=-=-=-=-=-=-
