Replied! Diff comments:
> diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py > b/lib/lp/bugs/model/exportvulnerabilityjob.py > new file mode 100644 > index 0000000..cecb478 > --- /dev/null > +++ b/lib/lp/bugs/model/exportvulnerabilityjob.py > @@ -0,0 +1,346 @@ > +# Copyright 2025 Canonical Ltd. This software is licensed under the > +# GNU Affero General Public License version 3 (see the file LICENSE). > + > +__all__ = [ > + "ExportVulnerabilityJob", > +] > + > +import io > +import logging > +import os > +import tempfile > +import zipfile > +from datetime import timedelta > +from typing import List, Optional, Tuple > + > +from zope.component import getUtility > +from zope.interface import implementer, provider > +from zope.security.proxy import removeSecurityProxy > + > +from lp.app.enums import InformationType > +from lp.bugs.enums import VulnerabilityHandlerEnum > +from lp.bugs.interfaces.cve import ICveSet > +from lp.bugs.interfaces.vulnerabilityjob import ( > + IExportVulnerabilityJob, > + IExportVulnerabilityJobSource, > + VulnerabilityJobException, > + VulnerabilityJobInProgress, > + VulnerabilityJobType, > +) > +from lp.bugs.model.bug import Bug as BugModel > +from lp.bugs.model.cve import Cve as CveModel > +from lp.bugs.model.vulnerability import Vulnerability > +from lp.bugs.model.vulnerabilityjob import ( > + VulnerabilityJob, > + VulnerabilityJobDerived, > +) > +from lp.bugs.scripts.soss.models import SOSSRecord > +from lp.bugs.scripts.soss.sossexport import SOSSExporter > +from lp.registry.interfaces.distribution import IDistributionSet > +from lp.registry.model.distribution import Distribution > +from lp.services.config import config > +from lp.services.database.interfaces import IPrimaryStore, IStore > +from lp.services.job.interfaces.job import JobStatus > +from lp.services.job.model.job import Job > +from lp.services.librarian.interfaces import ILibraryFileAliasSet > +from lp.services.utils import utc_now > + > +DISTRIBUTION_NAME = "soss" > +EXPIRATION_WEEK_INTERVAL = 1 > +logger = logging.getLogger(__name__) > + > + > +@implementer(IExportVulnerabilityJob) > +@provider(IExportVulnerabilityJobSource) > +class ExportVulnerabilityJob(VulnerabilityJobDerived): > + class_job_type = VulnerabilityJobType.EXPORT_DATA > + > + user_error_types = (VulnerabilityJobException,) > + > + config = config.IExportVulnerabilityJobSource > + > + def __init__(self, job): > + super().__init__(job) > + self.cve_set = getUtility(ICveSet) > + self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME) > + > + @property > + def sources(self): > + return self.metadata.get("request").get("sources") > + > + @property > + def information_type(self): > + return self.metadata.get("request").get("information_type") > + > + @property > + def error_description(self): > + return self.metadata.get("result").get("error_description") > + > + @classmethod > + def create( > + cls, > + handler, > + sources: Optional[List[str]] = None, > + information_type=InformationType.PRIVATESECURITY.value, > + ): > + """Create a new `ExportVulnerabilityJob`. > + > + :param handler: What handler to use for importing the data. Can be > one > + of a group of predefined classes (SOSS, UCT, ...). > + :param sources: A list of sources to export from. Gets used depending > + on the handler. > + """ > + store = IPrimaryStore(VulnerabilityJob) > + > + vulnerability_job = store.find( > + VulnerabilityJob, > + VulnerabilityJob.job_id == Job.id, > + VulnerabilityJob.job_type == cls.class_job_type, > + VulnerabilityJob.handler == handler, > + Job._status.is_in( > + (JobStatus.WAITING, JobStatus.RUNNING, JobStatus.SUSPENDED) > + ), > + ).one() > + > + if vulnerability_job is not None: > + raise VulnerabilityJobInProgress(cls(vulnerability_job)) > + > + # Schedule the initialization. > + metadata = { > + "request": { > + "sources": sources if sources is not None else [], > + "information_type": information_type, > + }, > + "result": { > + "error_description": [], > + "succeeded": [], > + "failed": [], > + }, > + "data": { > + "export_link": "", > + }, > + } > + > + vulnerability_job = VulnerabilityJob( > + handler, cls.class_job_type, metadata > + ) > + store.add(vulnerability_job) > + derived_job = cls(vulnerability_job) > + derived_job.celeryRunOnCommit() > + IStore(VulnerabilityJob).flush() > + return derived_job > + > + @classmethod > + def get(cls, handler): > + """See `IExportVulnerabilityJob`.""" > + vulnerability_job = ( > + IStore(VulnerabilityJob) > + .find( > + VulnerabilityJob, > + VulnerabilityJob.job_id == Job.id, > + VulnerabilityJob.job_type == cls.class_job_type, > + VulnerabilityJob.handler == handler, > + ) > + .one() > + ) > + return None if vulnerability_job is None else cls(vulnerability_job) > + > + def __repr__(self): > + """Returns an informative representation of the job.""" > + parts = "%s for" % self.__class__.__name__ > + parts += " handler: %s" % self.handler > + parts += ", metadata: %s" % self.metadata > + return "<%s>" % parts > + > + def _get_exporter_to_record( > + self, > + handler: VulnerabilityHandlerEnum, > + information_type: InformationType = InformationType.PRIVATESECURITY, > + ): > + """Decide which parser and importer to use > + > + :return: a tuple of (parser, importer) where parser is the function > + that gets a blob and returns a record and importer is the function > that > + gets a record and imports it. > + """ > + > + if handler == VulnerabilityHandlerEnum.SOSS: yes! > + exporter = SOSSExporter( > + information_type=information_type > + ).to_record > + else: > + exception = VulnerabilityJobException("Handler not found") > + self.notifyUserError(exception) > + raise exception > + > + return exporter > + > + def _get_bug_and_vulnerability( > + self, cve: CveModel, distribution: Distribution > + ) -> Tuple[Optional[BugModel], Optional[Vulnerability]]: > + > + vulnerability = cve.getDistributionVulnerability(distribution) > + > + if not vulnerability: > + logger.debug( > + f"[ExportVulnerabilityJob] CVE-{cve.sequence} " > + "does not have a vulnerability attached." > + ) > + return None, None > + > + bugs = vulnerability.bugs > + > + if not bugs: > + logger.debug( > + f"[ExportVulnerabilityJob] CVE-{cve.sequence} " > + "does not have a bug attached." > + ) > + return None, None > + > + if len(bugs) > 1: > + logger.debug( > + f"[ExportVulnerabilityJob] CVE-{cve.sequence} " > + "has more than one bug attached." > + ) > + return None, None > + > + bug = bugs[0] > + > + return removeSecurityProxy(bug), vulnerability nice thanks! > + > + def run(self): > + """See `IRunnableJob`.""" > + > + information_type = InformationType.items[self.information_type] > + export_to_record = self._get_exporter_to_record( > + self.context.handler, > + information_type, > + ) > + > + logger.debug( > + "[ExportVulnerabilityJob] Getting CVEs to export and storing " > + "them in Records" > + ) > + > + exported_cves: List[Tuple[SOSSRecord, str]] = [] > + for cve in self.cve_set: <3 > + > + bug, vulnerability = self._get_bug_and_vulnerability( > + cve, self.soss > + ) > + > + if not bug or not vulnerability: > + continue > + > + try: > + record = export_to_record(cve, self.soss, bug, vulnerability) > + except ValueError as e: > + self.notifyUserError(e) > + continue > + > + if record is None: Nice, thanks! Sorry for iterations. I was only referring to add that cve to the metadata :) > + continue > + > + exported_cves.append((record, record.candidate)) > + > + if exported_cves == []: > + exception = VulnerabilityJobException("No CVEs to export") > + self.notifyUserError(exception) > + return I think that only happens with testing, we can check it :) > + > + # Create a temporary folder > + with tempfile.TemporaryDirectory() as temp_dir: > + > + logger.debug( > + "[ExportVulnerabilityJob] Writing CVEs to temporary " > + f"directory {temp_dir}" > + ) > + > + for record, cve_name in exported_cves: > + file_path = os.path.join(temp_dir, cve_name) > + > + with open(file_path, "w") as f: > + > + logger.debug( > + "[ExportVulnerabilityJob] Writing CVE %s to file %s", > + cve_name, > + file_path, > + ) > + > + f.write(record.to_yaml()) > + > + # Create a zip archive of the temp folder > + buf = io.BytesIO() > + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zipf: > + logger.debug( > + "[ExportVulnerabilityJob] Compressing CVEs to zip" > + ) > + for dirpath, _, filenames in os.walk(temp_dir): > + for filename in filenames: > + try: > + logger.debug( > + "[ExportVulnerabilityJob] Compressing file > %s", > + filename, > + ) > + filepath = os.path.join(dirpath, filename) > + arcname = os.path.relpath( > + filepath, temp_dir > + ) # relative path inside zip > + zipf.write(filepath, arcname) > + except Exception as e: > + logger.error( > + f"Error adding file {filename} to zip: {e}" > + ) > + > self.metadata["result"]["failed"].append(filename) > + self.notifyUserError(e) > + else: > + self.metadata["result"]["succeeded"].append( > + filename > + ) > + > + zip_name = f"exported_vulnerabilities_{self.context.id}.zip" > + contentType = "application/zip" > + file_size = buf.getbuffer().nbytes > + > + expires = utc_now() + timedelta(weeks=EXPIRATION_WEEK_INTERVAL) > + > + # Reset buffer position to the beginning since librarian will read > it. > + buf.seek(0) > + > + logger.debug("[ExportVulnerabilityJob] Writing zip to librarian") > + lfa = getUtility(ILibraryFileAliasSet).create( > + name=zip_name, > + size=file_size, > + file=buf, > + contentType=contentType, > + expires=expires, > + ) > + > + self.metadata["data"]["export_link"] = lfa.getURL() > + > + def notifyUserError(self, error): > + """Calls up and also saves the error text in this job's metadata. > + > + See `BaseRunnableJob`. > + """ > + # This method is called when error is an instance of > + # self.user_error_types. > + super().notifyUserError(error) > + logger.error(error) > + error_description = self.metadata.get("result").get( > + "error_description", [] > + ) > + error_description.append(str(error)) > + self.metadata["result"]["error_description"] = error_description > + > + def getOopsVars(self): > + """See `IRunnableJob`.""" > + vars = super().getOopsVars() > + vars.extend( > + [ > + ("vulnerabilityjob_job_id", self.context.id), > + ("vulnerability_job_type", self.context.job_type.title), > + ("handler", self.context.handler), > + ] > + ) > + return vars > diff --git a/lib/lp/bugs/tests/sampledata/CVE-2025-1979 > b/lib/lp/bugs/tests/sampledata/CVE-2025-1979 > new file mode 100644 > index 0000000..a562efa > --- /dev/null > +++ b/lib/lp/bugs/tests/sampledata/CVE-2025-1979 > @@ -0,0 +1,78 @@ > +References: Yep, I'm also not sure about this, we can ask others for opinions. But I'm towards preferring that, rather than duplicating. I'm doing that in the `importvulnerabilityjob` tests. > +- > https://github.com/ray-project/ray/commit/64a2e4010522d60b90c389634f24df77b603d85d > +- https://github.com/ray-project/ray/issues/50266 > +- https://github.com/ray-project/ray/pull/50409 > +- https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212 > +- https://ubuntu.com/security/notices/SSN-148-1.json?show_hidden=true > +Notes: > +- This is a sample soss cve with all the fields filled for testing > +- sample note 2 > +Priority: Low > +Priority-Reason: 'Unrealistic exploitation scenario. Logs are stored locally > and not > + transferred between agents, so local log access is the only conceivable > method to > + view the password for the redis instance (i.e., no possibility of MitM to > access > + the logs). Given the requirement for priviledged system access to access > log files > + the real "danger" posed by the vulnerability is quite low, and that is > reflected > + in this priority assignment. ' > +Assigned-To: janitor > +Packages: > + conda: > + - Name: ray > + Channel: jammy:1.17.0/stable > + Repositories: > + - nvidia-pb3-python-stable-local > + Status: not-affected > + Note: 2.22.0+soss.1 > + maven: > + - Name: vllm > + Channel: noble:0.7.3/stable > + Repositories: > + - soss-src-stable-local > + Status: needs-triage > + Note: '' > + python: > + - Name: pyyaml > + Channel: jammy:2.22.0/stable > + Repositories: > + - nvidia-pb3-python-stable-local > + Status: not-affected > + Note: '' > + - Name: ray > + Channel: jammy:2.22.0/stable > + Repositories: > + - nvidia-pb3-python-stable-local > + Status: released > + Note: 2.22.0+soss.1 > + rust: > + - Name: ray > + Channel: focal:0.27.0/stable > + Repositories: > + - nvidia-pb3-python-stable-local > + Status: deferred > + Note: 2.22.0+soss.1 > + unpackaged: > + - Name: vllm > + Channel: noble:0.7.3/stable > + Repositories: > + - soss-src-stable-local > + Status: needed > + Note: '' > +Candidate: CVE-2025-1979 > +Description: "Versions of the package ray before 2.43.0 are vulnerable to > Insertion\ > + \ of Sensitive Information into Log File where the redis password is being > logged\ > + \ in the standard logging. If the redis password is passed as an argument, > it will\ > + \ be logged and could potentially leak the password.\r\rThis is only > exploitable\ > + \ if:\r\r1) Logging is enabled;\r\r2) Redis is using password > authentication;\r\r\ > + 3) Those logs are accessible to an attacker, who can reach that redis > instance.\r\ > + \r**Note:**\r\rIt is recommended that anyone who is running in this > configuration\ > + \ should update to the latest version of Ray, then rotate their redis > password." > +CVSS: > +- source: rep...@snyk.io > + vector: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:C/C:H/I:L/A:N > + baseScore: 6.4 > + baseSeverity: MEDIUM > +- source: security-advisor...@github.com > + vector: CVSS:3.1/AV:A/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H > + baseScore: 9.0 > + baseSeverity: CRITICAL > +PublicDate: '2025-03-06T05:15:16.213' > diff --git a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py > b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py > new file mode 100644 > index 0000000..11355cc > --- /dev/null > +++ b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py > @@ -0,0 +1,404 @@ > +# Copyright 2025 Canonical Ltd. This software is licensed under the > +# GNU Affero General Public License version 3 (see the file LICENSE). > + > +from pathlib import Path > + > +import transaction > +from testtools.matchers import MatchesRegex > +from zope.component import getUtility > +from zope.security.proxy import removeSecurityProxy > + > +from lp.app.enums import InformationType > +from lp.app.interfaces.launchpad import ILaunchpadCelebrities > +from lp.bugs.enums import VulnerabilityHandlerEnum > +from lp.bugs.interfaces.cve import ICveSet > +from lp.bugs.interfaces.vulnerabilityjob import ( > + IExportVulnerabilityJobSource, > + VulnerabilityJobException, > + VulnerabilityJobInProgress, > +) > +from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob > +from lp.bugs.scripts.soss.sossimport import SOSSImporter > +from lp.services.features.testing import FeatureFixture > +from lp.services.job.interfaces.job import JobStatus > +from lp.services.job.tests import block_on_job > +from lp.services.librarian.model import LibraryFileAlias > +from lp.testing import TestCaseWithFactory, person_logged_in > +from lp.testing.layers import CeleryJobLayer, LaunchpadZopelessLayer > + > + > +class ExportVulnerabilityJobTests(TestCaseWithFactory): > + """Test case for ImportVulnerabilityJob.""" > + > + layer = LaunchpadZopelessLayer > + > + def setUp(self): > + super().setUp() > + self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer > + > + self.cve_set = getUtility(ICveSet) > + > + @property > + def job_source(self): > + return getUtility(IExportVulnerabilityJobSource) > + > + def test_getOopsVars(self): > + """Test getOopsVars method.""" > + handler = VulnerabilityHandlerEnum.SOSS > + > + job = self.job_source.create(handler) > + vars = job.getOopsVars() > + naked_job = removeSecurityProxy(job) > + self.assertIn(("vulnerabilityjob_job_id", naked_job.id), vars) > + self.assertIn( > + ("vulnerability_job_type", naked_job.job_type.title), vars > + ) > + self.assertIn(("handler", naked_job.handler), vars) > + > + def test___repr__(self): > + """Test __repr__ method.""" > + handler = VulnerabilityHandlerEnum.SOSS > + information_type = InformationType.PRIVATESECURITY.value > + > + metadata = { > + "request": { > + "sources": [], > + "information_type": information_type, > + }, > + "result": { > + "error_description": [], > + "succeeded": [], > + "failed": [], > + }, > + "data": { > + "export_link": "", > + }, > + } > + > + job = self.job_source.create( > + handler, information_type=information_type > + ) > + > + expected = ( > + "<ExportVulnerabilityJob for " > + f"handler: {handler}, " > + f"metadata: {metadata}" > + ">" > + ) > + self.assertEqual(expected, repr(job)) > + > + def test_create_with_existing_in_progress_job(self): > + """If there's already a waiting/running ExportVulnerabilityJob for > the > + handler ExportVulnerabilityJob.create() raises an exception. > + """ > + handler = VulnerabilityHandlerEnum.SOSS > + > + # Job waiting status > + job = self.job_source.create(handler) > + waiting_exception = self.assertRaises( > + VulnerabilityJobInProgress, > + self.job_source.create, > + handler, > + ) > + self.assertEqual(job, waiting_exception.job) > + > + # Job status from WAITING to RUNNING > + job.start() > + running_exception = self.assertRaises( > + VulnerabilityJobInProgress, self.job_source.create, handler > + ) > + self.assertEqual(job, running_exception.job) > + > + def test_create_with_existing_completed_job(self): > + """If there's already a completed ExportVulnerabilityJob for the > + handler the job can be runned again. > + """ > + handler = VulnerabilityHandlerEnum.SOSS > + > + job = self.job_source.create(handler) > + job.start() > + job.complete() > + self.assertEqual(job.status, JobStatus.COMPLETED) > + > + job_duplicated = self.job_source.create(handler) > + job_duplicated.start() > + job_duplicated.complete() > + self.assertEqual(job_duplicated.status, JobStatus.COMPLETED) > + > + def test_create_with_existing_failed_job(self): > + """If there's a failed ExportVulnerabilityJob for the handler the job > + can be runned again. > + """ > + handler = VulnerabilityHandlerEnum.SOSS > + > + job = self.job_source.create(handler) > + job.start() > + job.fail() > + self.assertEqual(job.status, JobStatus.FAILED) > + > + job_duplicated = self.job_source.create(handler) > + job_duplicated.start() > + job_duplicated.complete() > + self.assertEqual(job_duplicated.status, JobStatus.COMPLETED) > + > + def test_arguments(self): > + """Test that ExportVulnerabilityJob specified with arguments can > + be gotten out again.""" > + > + handler = VulnerabilityHandlerEnum.SOSS > + information_type = InformationType.PRIVATESECURITY.value > + > + metadata = { > + "request": { > + "sources": ["https://launchpad.net/ubuntu"], > + "information_type": information_type, > + }, > + "result": { > + "error_description": [], > + "succeeded": [], > + "failed": [], > + }, > + "data": { > + "export_link": "", > + }, > + } > + > + job = self.job_source.create( > + handler, > + sources=["https://launchpad.net/ubuntu"], > + information_type=information_type, > + ) > + > + naked_job = removeSecurityProxy(job) > + self.assertEqual(naked_job.handler, handler) > + > + self.assertEqual(naked_job.metadata, metadata) > + > + def test_run_with_no_CVEs(self): > + """ > + Run ExportVulnerabilityJob but with no CVE that also has its bug and > + vulnerability. > + > + Note: Since we use the database to look for CVE sequence names, the > + dev environment's sample database must have no CVEs with bugs and > + vulnerabilities for this test to pass. If they ever get added, the > + test will need to be updated to use a different approach. > + """ > + > + self.factory.makeDistribution(name="soss") > + > + job = self.job_source.create(handler=VulnerabilityHandlerEnum.SOSS) > + > + # Run the job as bug_importer since normal users don't have > permission > + # This is to ensure we are actually testing the "no CVEs" case > + # instead of hitting a permission hiding the CVEs. > + with person_logged_in(self.bug_importer): nice > + job.run() > + > + self.assertEqual( > + job.metadata.get("result"), > + { > + "succeeded": [], > + "failed": [], > + "error_description": ["No CVEs to export"], > + }, > + ) > + > + def _put_cve_in_soss(self): > + self.factory.makePerson(name="octagalland") > + self.factory.makeDistribution( > + name="soss", > + displayname="SOSS", > + information_type=InformationType.PROPRIETARY, > + ) > + > + sampledata = Path(__file__).parent / "sampledata" > + > + soss_importer = SOSSImporter() > + > + imported_list = [] > + for file in sampledata.iterdir(): > + cve_sequence = file.name.lstrip("CVE-") > + if not self.cve_set[cve_sequence]: > + self.factory.makeCVE(sequence=cve_sequence) > + > + bug, vulnerability = soss_importer.import_cve_from_file(file) > + imported_list.append((cve_sequence, bug, vulnerability)) > + > + return imported_list > + > + def test_run_export(self): > + """Run ExportVulnerabilityJob.""" > + imported_list = self._put_cve_in_soss() > + information_type = InformationType.PRIVATESECURITY.value > + export_link = "http://example.com/fake-url" > + > + self.patch( > + LibraryFileAlias, > + "getURL", > + lambda self: export_link, > + ) > + > + job = self.job_source.create( > + handler=VulnerabilityHandlerEnum.SOSS, > + information_type=information_type, > + ) > + > + with person_logged_in(self.bug_importer): > + job.run() > + > + cve_names = [ > + f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list > + ] > + > + naked_job_metadata = removeSecurityProxy(job.metadata) > + naked_job_metadata["result"]["succeeded"].sort() > + cve_names.sort() > + > + self.assertEqual( > + naked_job_metadata, > + { > + "request": { > + "sources": [], > + "information_type": information_type, > + }, > + "result": { > + "error_description": [], > + "succeeded": cve_names, > + "failed": [], > + }, > + "data": { > + "export_link": export_link, > + }, > + }, > + ) > + > + def test_get(self): > + """ExportVulnerabilityJob.get() returns the import job for the given > + handler. > + """ > + handler = VulnerabilityHandlerEnum.SOSS > + > + # There is no job before creating it > + self.assertIs(None, self.job_source.get(handler)) > + > + job = self.job_source.create(handler) > + job_gotten = self.job_source.get(handler) > + > + self.assertIsInstance(job, ExportVulnerabilityJob) > + self.assertEqual(job, job_gotten) > + > + def test_error_description_when_no_error(self): > + """The ExportVulnerabilityJob.error_description property returns > + None when no error description is recorded.""" > + handler = VulnerabilityHandlerEnum.SOSS > + information_type = InformationType.PRIVATESECURITY.value > + > + job = self.job_source.create( > + handler, > + information_type=information_type, > + ) > + self.assertEqual([], removeSecurityProxy(job).error_description) > + > + def test_error_description_set_when_notifying_about_user_errors(self): > + """Test that error_description is set by notifyUserError().""" > + handler = VulnerabilityHandlerEnum.SOSS > + information_type = InformationType.PRIVATESECURITY.value > + > + job = self.job_source.create( > + handler, > + information_type=information_type, > + ) > + message = "This is an example message." > + job.notifyUserError(VulnerabilityJobException(message)) > + self.assertEqual([message], > removeSecurityProxy(job).error_description) > + > + > +class ExportVulnerabilityTestViaCelery(TestCaseWithFactory): > + layer = CeleryJobLayer > + > + def setUp(self): > + super().setUp() > + > + self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer > + self.cve_set = getUtility(ICveSet) > + > + def _put_cve_in_soss(self): > + self.factory.makePerson(name="octagalland") > + self.factory.makeDistribution( > + name="soss", > + displayname="SOSS", > + information_type=InformationType.PROPRIETARY, > + ) > + > + sampledata = Path(__file__).parent / "sampledata" > + > + soss_importer = SOSSImporter() > + > + imported_list = [] > + for file in sampledata.iterdir(): > + cve_sequence = ( > + file.name[4:] if file.name.startswith("CVE-") else file.name > + ) > + if not self.cve_set[cve_sequence]: > + self.factory.makeCVE(sequence=cve_sequence) > + > + bug, vulnerability = soss_importer.import_cve_from_file(file) > + imported_list.append((cve_sequence, bug, vulnerability)) > + > + return imported_list > + > + def test_job(self): > + """Job runs via Celery.""" > + fixture = FeatureFixture( > + { > + "jobs.celery.enabled_classes": "ExportVulnerabilityJob", > + } > + ) > + self.useFixture(fixture) > + > + imported_list = self._put_cve_in_soss() > + transaction.commit() > + > + job_source = getUtility(IExportVulnerabilityJobSource) > + > + handler = VulnerabilityHandlerEnum.SOSS > + information_type = InformationType.PRIVATESECURITY.value > + with block_on_job(): > + job_source.create( > + handler, > + information_type=information_type, > + ) > + transaction.commit() > + > + cve_names = [ > + f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list > + ] > + > + job = job_source.get(handler) > + > + naked_job_metadata = removeSecurityProxy(job.metadata) > + naked_job_metadata["result"]["succeeded"].sort() > + cve_names.sort() > + > + metadata_request = { > + "sources": [], > + "information_type": information_type, > + } > + > + metadata_result = { > + "error_description": [], > + "succeeded": cve_names, > + "failed": [], > + } > + > + self.assertEqual(handler, job.handler) > + self.assertEqual(metadata_request, naked_job_metadata["request"]) > + self.assertEqual(metadata_result, naked_job_metadata["result"]) > + > + self.assertThat( > + naked_job_metadata["data"]["export_link"], > + MatchesRegex( > + r".*exported_vulnerabilities_[0-9]+\.zip$", > + ), > + ) -- https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492056 Your team Launchpad code reviewers is requested to review the proposed merge of ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master. _______________________________________________ Mailing list: https://launchpad.net/~launchpad-reviewers Post to : launchpad-reviewers@lists.launchpad.net Unsubscribe : https://launchpad.net/~launchpad-reviewers More help : https://help.launchpad.net/ListHelp