Replied!

Diff comments:

> diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py 
> b/lib/lp/bugs/model/exportvulnerabilityjob.py
> new file mode 100644
> index 0000000..cecb478
> --- /dev/null
> +++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
> @@ -0,0 +1,346 @@
> +# Copyright 2025 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> +    "ExportVulnerabilityJob",
> +]
> +
> +import io
> +import logging
> +import os
> +import tempfile
> +import zipfile
> +from datetime import timedelta
> +from typing import List, Optional, Tuple
> +
> +from zope.component import getUtility
> +from zope.interface import implementer, provider
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerabilityjob import (
> +    IExportVulnerabilityJob,
> +    IExportVulnerabilityJobSource,
> +    VulnerabilityJobException,
> +    VulnerabilityJobInProgress,
> +    VulnerabilityJobType,
> +)
> +from lp.bugs.model.bug import Bug as BugModel
> +from lp.bugs.model.cve import Cve as CveModel
> +from lp.bugs.model.vulnerability import Vulnerability
> +from lp.bugs.model.vulnerabilityjob import (
> +    VulnerabilityJob,
> +    VulnerabilityJobDerived,
> +)
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.bugs.scripts.soss.sossexport import SOSSExporter
> +from lp.registry.interfaces.distribution import IDistributionSet
> +from lp.registry.model.distribution import Distribution
> +from lp.services.config import config
> +from lp.services.database.interfaces import IPrimaryStore, IStore
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.model.job import Job
> +from lp.services.librarian.interfaces import ILibraryFileAliasSet
> +from lp.services.utils import utc_now
> +
> +DISTRIBUTION_NAME = "soss"
> +EXPIRATION_WEEK_INTERVAL = 1
> +logger = logging.getLogger(__name__)
> +
> +
> +@implementer(IExportVulnerabilityJob)
> +@provider(IExportVulnerabilityJobSource)
> +class ExportVulnerabilityJob(VulnerabilityJobDerived):
> +    class_job_type = VulnerabilityJobType.EXPORT_DATA
> +
> +    user_error_types = (VulnerabilityJobException,)
> +
> +    config = config.IExportVulnerabilityJobSource
> +
> +    def __init__(self, job):
> +        super().__init__(job)
> +        self.cve_set = getUtility(ICveSet)
> +        self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
> +
> +    @property
> +    def sources(self):
> +        return self.metadata.get("request").get("sources")
> +
> +    @property
> +    def information_type(self):
> +        return self.metadata.get("request").get("information_type")
> +
> +    @property
> +    def error_description(self):
> +        return self.metadata.get("result").get("error_description")
> +
> +    @classmethod
> +    def create(
> +        cls,
> +        handler,
> +        sources: Optional[List[str]] = None,
> +        information_type=InformationType.PRIVATESECURITY.value,
> +    ):
> +        """Create a new `ExportVulnerabilityJob`.
> +
> +        :param handler: What handler to use for importing the data. Can be 
> one
> +            of a group of predefined classes (SOSS, UCT, ...).
> +        :param sources: A list of sources to export from. Gets used depending
> +            on the handler.
> +        """
> +        store = IPrimaryStore(VulnerabilityJob)
> +
> +        vulnerability_job = store.find(
> +            VulnerabilityJob,
> +            VulnerabilityJob.job_id == Job.id,
> +            VulnerabilityJob.job_type == cls.class_job_type,
> +            VulnerabilityJob.handler == handler,
> +            Job._status.is_in(
> +                (JobStatus.WAITING, JobStatus.RUNNING, JobStatus.SUSPENDED)
> +            ),
> +        ).one()
> +
> +        if vulnerability_job is not None:
> +            raise VulnerabilityJobInProgress(cls(vulnerability_job))
> +
> +        # Schedule the initialization.
> +        metadata = {
> +            "request": {
> +                "sources": sources if sources is not None else [],
> +                "information_type": information_type,
> +            },
> +            "result": {
> +                "error_description": [],
> +                "succeeded": [],
> +                "failed": [],
> +            },
> +            "data": {
> +                "export_link": "",
> +            },
> +        }
> +
> +        vulnerability_job = VulnerabilityJob(
> +            handler, cls.class_job_type, metadata
> +        )
> +        store.add(vulnerability_job)
> +        derived_job = cls(vulnerability_job)
> +        derived_job.celeryRunOnCommit()
> +        IStore(VulnerabilityJob).flush()
> +        return derived_job
> +
> +    @classmethod
> +    def get(cls, handler):
> +        """See `IExportVulnerabilityJob`."""
> +        vulnerability_job = (
> +            IStore(VulnerabilityJob)
> +            .find(
> +                VulnerabilityJob,
> +                VulnerabilityJob.job_id == Job.id,
> +                VulnerabilityJob.job_type == cls.class_job_type,
> +                VulnerabilityJob.handler == handler,
> +            )
> +            .one()
> +        )
> +        return None if vulnerability_job is None else cls(vulnerability_job)
> +
> +    def __repr__(self):
> +        """Returns an informative representation of the job."""
> +        parts = "%s for" % self.__class__.__name__
> +        parts += " handler: %s" % self.handler
> +        parts += ", metadata: %s" % self.metadata
> +        return "<%s>" % parts
> +
> +    def _get_exporter_to_record(
> +        self,
> +        handler: VulnerabilityHandlerEnum,
> +        information_type: InformationType = InformationType.PRIVATESECURITY,
> +    ):
> +        """Decide which parser and importer to use
> +
> +        :return: a tuple of (parser, importer) where parser is the function
> +        that gets a blob and returns a record and importer is the function 
> that
> +        gets a record and imports it.
> +        """
> +
> +        if handler == VulnerabilityHandlerEnum.SOSS:

yes!

> +            exporter = SOSSExporter(
> +                information_type=information_type
> +            ).to_record
> +        else:
> +            exception = VulnerabilityJobException("Handler not found")
> +            self.notifyUserError(exception)
> +            raise exception
> +
> +        return exporter
> +
> +    def _get_bug_and_vulnerability(
> +        self, cve: CveModel, distribution: Distribution
> +    ) -> Tuple[Optional[BugModel], Optional[Vulnerability]]:
> +
> +        vulnerability = cve.getDistributionVulnerability(distribution)
> +
> +        if not vulnerability:
> +            logger.debug(
> +                f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> +                "does not have a vulnerability attached."
> +            )
> +            return None, None
> +
> +        bugs = vulnerability.bugs
> +
> +        if not bugs:
> +            logger.debug(
> +                f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> +                "does not have a bug attached."
> +            )
> +            return None, None
> +
> +        if len(bugs) > 1:
> +            logger.debug(
> +                f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> +                "has more than one bug attached."
> +            )
> +            return None, None
> +
> +        bug = bugs[0]
> +
> +        return removeSecurityProxy(bug), vulnerability

nice thanks!

> +
> +    def run(self):
> +        """See `IRunnableJob`."""
> +
> +        information_type = InformationType.items[self.information_type]
> +        export_to_record = self._get_exporter_to_record(
> +            self.context.handler,
> +            information_type,
> +        )
> +
> +        logger.debug(
> +            "[ExportVulnerabilityJob] Getting CVEs to export and storing "
> +            "them in Records"
> +        )
> +
> +        exported_cves: List[Tuple[SOSSRecord, str]] = []
> +        for cve in self.cve_set:

<3

> +
> +            bug, vulnerability = self._get_bug_and_vulnerability(
> +                cve, self.soss
> +            )
> +
> +            if not bug or not vulnerability:
> +                continue
> +
> +            try:
> +                record = export_to_record(cve, self.soss, bug, vulnerability)
> +            except ValueError as e:
> +                self.notifyUserError(e)
> +                continue
> +
> +            if record is None:

Nice, thanks! Sorry for iterations.
I was only referring to add that cve to the metadata :)

> +                continue
> +
> +            exported_cves.append((record, record.candidate))
> +
> +        if exported_cves == []:
> +            exception = VulnerabilityJobException("No CVEs to export")
> +            self.notifyUserError(exception)
> +            return

I think that only happens with testing, we can check it :)

> +
> +        # Create a temporary folder
> +        with tempfile.TemporaryDirectory() as temp_dir:
> +
> +            logger.debug(
> +                "[ExportVulnerabilityJob] Writing CVEs to temporary "
> +                f"directory {temp_dir}"
> +            )
> +
> +            for record, cve_name in exported_cves:
> +                file_path = os.path.join(temp_dir, cve_name)
> +
> +                with open(file_path, "w") as f:
> +
> +                    logger.debug(
> +                        "[ExportVulnerabilityJob] Writing CVE %s to file %s",
> +                        cve_name,
> +                        file_path,
> +                    )
> +
> +                    f.write(record.to_yaml())
> +
> +            # Create a zip archive of the temp folder
> +            buf = io.BytesIO()
> +            with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zipf:
> +                logger.debug(
> +                    "[ExportVulnerabilityJob] Compressing CVEs to zip"
> +                )
> +                for dirpath, _, filenames in os.walk(temp_dir):
> +                    for filename in filenames:
> +                        try:
> +                            logger.debug(
> +                                "[ExportVulnerabilityJob] Compressing file 
> %s",
> +                                filename,
> +                            )
> +                            filepath = os.path.join(dirpath, filename)
> +                            arcname = os.path.relpath(
> +                                filepath, temp_dir
> +                            )  # relative path inside zip
> +                            zipf.write(filepath, arcname)
> +                        except Exception as e:
> +                            logger.error(
> +                                f"Error adding file {filename} to zip: {e}"
> +                            )
> +                            
> self.metadata["result"]["failed"].append(filename)
> +                            self.notifyUserError(e)
> +                        else:
> +                            self.metadata["result"]["succeeded"].append(
> +                                filename
> +                            )
> +
> +        zip_name = f"exported_vulnerabilities_{self.context.id}.zip"
> +        contentType = "application/zip"
> +        file_size = buf.getbuffer().nbytes
> +
> +        expires = utc_now() + timedelta(weeks=EXPIRATION_WEEK_INTERVAL)
> +
> +        # Reset buffer position to the beginning since librarian will read 
> it.
> +        buf.seek(0)
> +
> +        logger.debug("[ExportVulnerabilityJob] Writing zip to librarian")
> +        lfa = getUtility(ILibraryFileAliasSet).create(
> +            name=zip_name,
> +            size=file_size,
> +            file=buf,
> +            contentType=contentType,
> +            expires=expires,
> +        )
> +
> +        self.metadata["data"]["export_link"] = lfa.getURL()
> +
> +    def notifyUserError(self, error):
> +        """Calls up and also saves the error text in this job's metadata.
> +
> +        See `BaseRunnableJob`.
> +        """
> +        # This method is called when error is an instance of
> +        # self.user_error_types.
> +        super().notifyUserError(error)
> +        logger.error(error)
> +        error_description = self.metadata.get("result").get(
> +            "error_description", []
> +        )
> +        error_description.append(str(error))
> +        self.metadata["result"]["error_description"] = error_description
> +
> +    def getOopsVars(self):
> +        """See `IRunnableJob`."""
> +        vars = super().getOopsVars()
> +        vars.extend(
> +            [
> +                ("vulnerabilityjob_job_id", self.context.id),
> +                ("vulnerability_job_type", self.context.job_type.title),
> +                ("handler", self.context.handler),
> +            ]
> +        )
> +        return vars
> diff --git a/lib/lp/bugs/tests/sampledata/CVE-2025-1979 
> b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
> new file mode 100644
> index 0000000..a562efa
> --- /dev/null
> +++ b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
> @@ -0,0 +1,78 @@
> +References:

Yep, I'm also not sure about this, we can ask others for opinions. But I'm 
towards preferring that, rather than duplicating. I'm doing that in the 
`importvulnerabilityjob` tests.

> +- 
> https://github.com/ray-project/ray/commit/64a2e4010522d60b90c389634f24df77b603d85d
> +- https://github.com/ray-project/ray/issues/50266
> +- https://github.com/ray-project/ray/pull/50409
> +- https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212
> +- https://ubuntu.com/security/notices/SSN-148-1.json?show_hidden=true
> +Notes:
> +- This is a sample soss cve with all the fields filled for testing
> +- sample note 2
> +Priority: Low
> +Priority-Reason: 'Unrealistic exploitation scenario. Logs are stored locally 
> and not
> +  transferred between agents, so local log access is the only conceivable 
> method to
> +  view the password for the redis instance (i.e., no possibility of MitM to 
> access
> +  the logs). Given the requirement for priviledged system access to access 
> log files
> +  the real "danger" posed by the vulnerability is quite low, and that is 
> reflected
> +  in this priority assignment. '
> +Assigned-To: janitor
> +Packages:
> +  conda:
> +  - Name: ray
> +    Channel: jammy:1.17.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: not-affected
> +    Note: 2.22.0+soss.1
> +  maven:
> +  - Name: vllm
> +    Channel: noble:0.7.3/stable
> +    Repositories:
> +    - soss-src-stable-local
> +    Status: needs-triage
> +    Note: ''
> +  python:
> +  - Name: pyyaml
> +    Channel: jammy:2.22.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: not-affected
> +    Note: ''
> +  - Name: ray
> +    Channel: jammy:2.22.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: released
> +    Note: 2.22.0+soss.1
> +  rust:
> +  - Name: ray
> +    Channel: focal:0.27.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: deferred
> +    Note: 2.22.0+soss.1
> +  unpackaged:
> +  - Name: vllm
> +    Channel: noble:0.7.3/stable
> +    Repositories:
> +    - soss-src-stable-local
> +    Status: needed
> +    Note: ''
> +Candidate: CVE-2025-1979
> +Description: "Versions of the package ray before 2.43.0 are vulnerable to 
> Insertion\
> +  \ of Sensitive Information into Log File where the redis password is being 
> logged\
> +  \ in the standard logging. If the redis password is passed as an argument, 
> it will\
> +  \ be logged and could potentially leak the password.\r\rThis is only 
> exploitable\
> +  \ if:\r\r1) Logging is enabled;\r\r2) Redis is using password 
> authentication;\r\r\
> +  3) Those logs are accessible to an attacker, who can reach that redis 
> instance.\r\
> +  \r**Note:**\r\rIt is recommended that anyone who is running in this 
> configuration\
> +  \ should update to the latest version of Ray, then rotate their redis 
> password."
> +CVSS:
> +- source: rep...@snyk.io
> +  vector: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:C/C:H/I:L/A:N
> +  baseScore: 6.4
> +  baseSeverity: MEDIUM
> +- source: security-advisor...@github.com
> +  vector: CVSS:3.1/AV:A/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H
> +  baseScore: 9.0
> +  baseSeverity: CRITICAL
> +PublicDate: '2025-03-06T05:15:16.213'
> diff --git a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py 
> b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
> new file mode 100644
> index 0000000..11355cc
> --- /dev/null
> +++ b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
> @@ -0,0 +1,404 @@
> +# Copyright 2025 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +from pathlib import Path
> +
> +import transaction
> +from testtools.matchers import MatchesRegex
> +from zope.component import getUtility
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerabilityjob import (
> +    IExportVulnerabilityJobSource,
> +    VulnerabilityJobException,
> +    VulnerabilityJobInProgress,
> +)
> +from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
> +from lp.bugs.scripts.soss.sossimport import SOSSImporter
> +from lp.services.features.testing import FeatureFixture
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.tests import block_on_job
> +from lp.services.librarian.model import LibraryFileAlias
> +from lp.testing import TestCaseWithFactory, person_logged_in
> +from lp.testing.layers import CeleryJobLayer, LaunchpadZopelessLayer
> +
> +
> +class ExportVulnerabilityJobTests(TestCaseWithFactory):
> +    """Test case for ImportVulnerabilityJob."""
> +
> +    layer = LaunchpadZopelessLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> +
> +        self.cve_set = getUtility(ICveSet)
> +
> +    @property
> +    def job_source(self):
> +        return getUtility(IExportVulnerabilityJobSource)
> +
> +    def test_getOopsVars(self):
> +        """Test getOopsVars method."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        job = self.job_source.create(handler)
> +        vars = job.getOopsVars()
> +        naked_job = removeSecurityProxy(job)
> +        self.assertIn(("vulnerabilityjob_job_id", naked_job.id), vars)
> +        self.assertIn(
> +            ("vulnerability_job_type", naked_job.job_type.title), vars
> +        )
> +        self.assertIn(("handler", naked_job.handler), vars)
> +
> +    def test___repr__(self):
> +        """Test __repr__ method."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        metadata = {
> +            "request": {
> +                "sources": [],
> +                "information_type": information_type,
> +            },
> +            "result": {
> +                "error_description": [],
> +                "succeeded": [],
> +                "failed": [],
> +            },
> +            "data": {
> +                "export_link": "",
> +            },
> +        }
> +
> +        job = self.job_source.create(
> +            handler, information_type=information_type
> +        )
> +
> +        expected = (
> +            "<ExportVulnerabilityJob for "
> +            f"handler: {handler}, "
> +            f"metadata: {metadata}"
> +            ">"
> +        )
> +        self.assertEqual(expected, repr(job))
> +
> +    def test_create_with_existing_in_progress_job(self):
> +        """If there's already a waiting/running ExportVulnerabilityJob for 
> the
> +        handler ExportVulnerabilityJob.create() raises an exception.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        # Job waiting status
> +        job = self.job_source.create(handler)
> +        waiting_exception = self.assertRaises(
> +            VulnerabilityJobInProgress,
> +            self.job_source.create,
> +            handler,
> +        )
> +        self.assertEqual(job, waiting_exception.job)
> +
> +        # Job status from WAITING to RUNNING
> +        job.start()
> +        running_exception = self.assertRaises(
> +            VulnerabilityJobInProgress, self.job_source.create, handler
> +        )
> +        self.assertEqual(job, running_exception.job)
> +
> +    def test_create_with_existing_completed_job(self):
> +        """If there's already a completed ExportVulnerabilityJob for the
> +        handler the job can be runned again.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        job = self.job_source.create(handler)
> +        job.start()
> +        job.complete()
> +        self.assertEqual(job.status, JobStatus.COMPLETED)
> +
> +        job_duplicated = self.job_source.create(handler)
> +        job_duplicated.start()
> +        job_duplicated.complete()
> +        self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
> +
> +    def test_create_with_existing_failed_job(self):
> +        """If there's a failed ExportVulnerabilityJob for the handler the job
> +        can be runned again.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        job = self.job_source.create(handler)
> +        job.start()
> +        job.fail()
> +        self.assertEqual(job.status, JobStatus.FAILED)
> +
> +        job_duplicated = self.job_source.create(handler)
> +        job_duplicated.start()
> +        job_duplicated.complete()
> +        self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
> +
> +    def test_arguments(self):
> +        """Test that ExportVulnerabilityJob specified with arguments can
> +        be gotten out again."""
> +
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        metadata = {
> +            "request": {
> +                "sources": ["https://launchpad.net/ubuntu";],
> +                "information_type": information_type,
> +            },
> +            "result": {
> +                "error_description": [],
> +                "succeeded": [],
> +                "failed": [],
> +            },
> +            "data": {
> +                "export_link": "",
> +            },
> +        }
> +
> +        job = self.job_source.create(
> +            handler,
> +            sources=["https://launchpad.net/ubuntu";],
> +            information_type=information_type,
> +        )
> +
> +        naked_job = removeSecurityProxy(job)
> +        self.assertEqual(naked_job.handler, handler)
> +
> +        self.assertEqual(naked_job.metadata, metadata)
> +
> +    def test_run_with_no_CVEs(self):
> +        """
> +        Run ExportVulnerabilityJob but with no CVE that also has its bug and
> +        vulnerability.
> +
> +        Note: Since we use the database to look for CVE sequence names, the
> +        dev environment's sample database must have no CVEs with bugs and
> +        vulnerabilities for this test to pass. If they ever get added, the
> +        test will need to be updated to use a different approach.
> +        """
> +
> +        self.factory.makeDistribution(name="soss")
> +
> +        job = self.job_source.create(handler=VulnerabilityHandlerEnum.SOSS)
> +
> +        # Run the job as bug_importer since normal users don't have 
> permission
> +        # This is to ensure we are actually testing the "no CVEs" case
> +        # instead of hitting a permission hiding the CVEs.
> +        with person_logged_in(self.bug_importer):

nice

> +            job.run()
> +
> +        self.assertEqual(
> +            job.metadata.get("result"),
> +            {
> +                "succeeded": [],
> +                "failed": [],
> +                "error_description": ["No CVEs to export"],
> +            },
> +        )
> +
> +    def _put_cve_in_soss(self):
> +        self.factory.makePerson(name="octagalland")
> +        self.factory.makeDistribution(
> +            name="soss",
> +            displayname="SOSS",
> +            information_type=InformationType.PROPRIETARY,
> +        )
> +
> +        sampledata = Path(__file__).parent / "sampledata"
> +
> +        soss_importer = SOSSImporter()
> +
> +        imported_list = []
> +        for file in sampledata.iterdir():
> +            cve_sequence = file.name.lstrip("CVE-")
> +            if not self.cve_set[cve_sequence]:
> +                self.factory.makeCVE(sequence=cve_sequence)
> +
> +            bug, vulnerability = soss_importer.import_cve_from_file(file)
> +            imported_list.append((cve_sequence, bug, vulnerability))
> +
> +        return imported_list
> +
> +    def test_run_export(self):
> +        """Run ExportVulnerabilityJob."""
> +        imported_list = self._put_cve_in_soss()
> +        information_type = InformationType.PRIVATESECURITY.value
> +        export_link = "http://example.com/fake-url";
> +
> +        self.patch(
> +            LibraryFileAlias,
> +            "getURL",
> +            lambda self: export_link,
> +        )
> +
> +        job = self.job_source.create(
> +            handler=VulnerabilityHandlerEnum.SOSS,
> +            information_type=information_type,
> +        )
> +
> +        with person_logged_in(self.bug_importer):
> +            job.run()
> +
> +        cve_names = [
> +            f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
> +        ]
> +
> +        naked_job_metadata = removeSecurityProxy(job.metadata)
> +        naked_job_metadata["result"]["succeeded"].sort()
> +        cve_names.sort()
> +
> +        self.assertEqual(
> +            naked_job_metadata,
> +            {
> +                "request": {
> +                    "sources": [],
> +                    "information_type": information_type,
> +                },
> +                "result": {
> +                    "error_description": [],
> +                    "succeeded": cve_names,
> +                    "failed": [],
> +                },
> +                "data": {
> +                    "export_link": export_link,
> +                },
> +            },
> +        )
> +
> +    def test_get(self):
> +        """ExportVulnerabilityJob.get() returns the import job for the given
> +        handler.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        # There is no job before creating it
> +        self.assertIs(None, self.job_source.get(handler))
> +
> +        job = self.job_source.create(handler)
> +        job_gotten = self.job_source.get(handler)
> +
> +        self.assertIsInstance(job, ExportVulnerabilityJob)
> +        self.assertEqual(job, job_gotten)
> +
> +    def test_error_description_when_no_error(self):
> +        """The ExportVulnerabilityJob.error_description property returns
> +        None when no error description is recorded."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        job = self.job_source.create(
> +            handler,
> +            information_type=information_type,
> +        )
> +        self.assertEqual([], removeSecurityProxy(job).error_description)
> +
> +    def test_error_description_set_when_notifying_about_user_errors(self):
> +        """Test that error_description is set by notifyUserError()."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        job = self.job_source.create(
> +            handler,
> +            information_type=information_type,
> +        )
> +        message = "This is an example message."
> +        job.notifyUserError(VulnerabilityJobException(message))
> +        self.assertEqual([message], 
> removeSecurityProxy(job).error_description)
> +
> +
> +class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
> +    layer = CeleryJobLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +
> +        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> +        self.cve_set = getUtility(ICveSet)
> +
> +    def _put_cve_in_soss(self):
> +        self.factory.makePerson(name="octagalland")
> +        self.factory.makeDistribution(
> +            name="soss",
> +            displayname="SOSS",
> +            information_type=InformationType.PROPRIETARY,
> +        )
> +
> +        sampledata = Path(__file__).parent / "sampledata"
> +
> +        soss_importer = SOSSImporter()
> +
> +        imported_list = []
> +        for file in sampledata.iterdir():
> +            cve_sequence = (
> +                file.name[4:] if file.name.startswith("CVE-") else file.name
> +            )
> +            if not self.cve_set[cve_sequence]:
> +                self.factory.makeCVE(sequence=cve_sequence)
> +
> +            bug, vulnerability = soss_importer.import_cve_from_file(file)
> +            imported_list.append((cve_sequence, bug, vulnerability))
> +
> +        return imported_list
> +
> +    def test_job(self):
> +        """Job runs via Celery."""
> +        fixture = FeatureFixture(
> +            {
> +                "jobs.celery.enabled_classes": "ExportVulnerabilityJob",
> +            }
> +        )
> +        self.useFixture(fixture)
> +
> +        imported_list = self._put_cve_in_soss()
> +        transaction.commit()
> +
> +        job_source = getUtility(IExportVulnerabilityJobSource)
> +
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +        with block_on_job():
> +            job_source.create(
> +                handler,
> +                information_type=information_type,
> +            )
> +            transaction.commit()
> +
> +        cve_names = [
> +            f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
> +        ]
> +
> +        job = job_source.get(handler)
> +
> +        naked_job_metadata = removeSecurityProxy(job.metadata)
> +        naked_job_metadata["result"]["succeeded"].sort()
> +        cve_names.sort()
> +
> +        metadata_request = {
> +            "sources": [],
> +            "information_type": information_type,
> +        }
> +
> +        metadata_result = {
> +            "error_description": [],
> +            "succeeded": cve_names,
> +            "failed": [],
> +        }
> +
> +        self.assertEqual(handler, job.handler)
> +        self.assertEqual(metadata_request, naked_job_metadata["request"])
> +        self.assertEqual(metadata_result, naked_job_metadata["result"])
> +
> +        self.assertThat(
> +            naked_job_metadata["data"]["export_link"],
> +            MatchesRegex(
> +                r".*exported_vulnerabilities_[0-9]+\.zip$",
> +            ),
> +        )


-- 
https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492056
Your team Launchpad code reviewers is requested to review the proposed merge of 
~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master.


_______________________________________________
Mailing list: https://launchpad.net/~launchpad-reviewers
Post to     : launchpad-reviewers@lists.launchpad.net
Unsubscribe : https://launchpad.net/~launchpad-reviewers
More help   : https://help.launchpad.net/ListHelp

Reply via email to