Colin Watson has proposed merging ~cjwatson/launchpad:extract-job-state-flush 
into launchpad:master.

Commit message:
Flush store before extracting job state

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/449126

Since converting `Job` to Storm in c69d6205ab, the store is no longer 
automatically flushed when the `id` column is fetched, so we have to be a 
little more careful to ensure that it's flushed at appropriate times.  Some 
Celery tasks are currently showing up with their job IDs as None, indicating 
that this isn't always being done properly.  This appears to be because the 
before-commit hook that extracts the job state runs before the Storm hook that 
flushes the store at the start of a commit, so if the statement that created 
the `Job` row hadn't yet been flushed for some other reason then we might not 
know the job ID yet.

The test suite evaded this problem because Celery-based job tests typically use 
`FeatureFixture`, which causes `BaseRunnableJob.celeryRunOnCommit` to make 
another database query after creating the job, thus causing an implicit flush.

Flushing the store before extracting the job state should be a complete fix for 
this class of problem.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of 
~cjwatson/launchpad:extract-job-state-flush into launchpad:master.
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 9b8f1cf..f66d5eb 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -41,6 +41,7 @@ from zope.security.proxy import removeSecurityProxy
 
 from lp.services import scripts
 from lp.services.config import config, dbconfig
+from lp.services.database.interfaces import IStore
 from lp.services.database.policy import DatabaseBlockedPolicy
 from lp.services.features import getFeatureFlag
 from lp.services.job.interfaces.job import IJob, IRunnableJob
@@ -266,6 +267,11 @@ class BaseRunnableJob(BaseRunnableJobSource):
 
     def extractJobState(self):
         """Hook function to call before starting a commit."""
+        # Before-commit hooks are called before the hook in
+        # storm.zope.zstorm.StoreDataManager.tpc_begin that flushes the
+        # store, so we have to flush the store here because we might
+        # otherwise not know the job ID yet.
+        IStore(self.job).flush()
         self.job_state = JobState(self)
 
     def celeryCommitHook(self, succeeded):
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 70bf2b6..970f9f5 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -26,7 +26,7 @@ from lp.services.database.interfaces import IStore
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import IJob, IRunnableJob, JobStatus
 from lp.services.job.model.job import Job
-from lp.services.job.runner import BaseRunnableJob
+from lp.services.job.runner import BaseRunnableJob, celery_enabled
 from lp.services.job.tests import block_on_job, monitor_celery
 from lp.testing import TestCaseWithFactory
 from lp.testing.layers import CeleryJobLayer
@@ -45,7 +45,6 @@ class TestJob(BaseRunnableJob):
             self.job = store.find(Job, id=job_id)[0]
         else:
             self.job = Job(max_retries=2, scheduled_start=scheduled_start)
-            IStore(Job).flush()
 
     def run(self):
         pass
@@ -100,16 +99,25 @@ class TestJobsViaCelery(TestCaseWithFactory):
 
     layer = CeleryJobLayer
 
-    def test_TestJob(self):
-        # TestJob can be run via Celery.
+    def enableCeleryClass(self, job_class_name):
+        """Enable running jobs with a given class name via Celery."""
         self.useFixture(
-            FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
+            FeatureFixture({"jobs.celery.enabled_classes": job_class_name})
         )
+        # Prime the feature flag cache so that
+        # BaseRunnableJob.celeryRunOnCommit doesn't make a database query.
+        # This lets us more carefully test the flush behaviour in
+        # BaseRunnableJob.extractJobState.
+        self.assertTrue(celery_enabled(job_class_name))
+
+    def test_TestJob(self):
+        # TestJob can be run via Celery.
+        self.enableCeleryClass("TestJob")
         with block_on_job(self):
             job = TestJob()
             job.celeryRunOnCommit()
-            job_id = job.job_id
             transaction.commit()
+            job_id = job.job_id
         store = IStore(Job)
         dbjob = store.find(Job, id=job_id)[0]
         self.assertEqual(JobStatus.COMPLETED, dbjob.status)
@@ -119,9 +127,7 @@ class TestJobsViaCelery(TestCaseWithFactory):
         # in 10 seconds, and one at any time.  Wait up to a minute and
         # ensure that the correct three have completed, and that they
         # completed in the expected order.
-        self.useFixture(
-            FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
-        )
+        self.enableCeleryClass("TestJob")
         now = datetime.now(timezone.utc)
         job_past = TestJob(scheduled_start=now - timedelta(seconds=60))
         job_past.celeryRunOnCommit()
@@ -158,11 +164,7 @@ class TestJobsViaCelery(TestCaseWithFactory):
     def test_jobs_with_retry_exceptions_are_queued_again(self):
         # A job that raises a retry error is automatically queued
         # and executed again.
-        self.useFixture(
-            FeatureFixture(
-                {"jobs.celery.enabled_classes": "TestJobWithRetryError"}
-            )
-        )
+        self.enableCeleryClass("TestJobWithRetryError")
 
         # Set scheduled_start on the job to ensure that retry delays
         # override it.
@@ -214,14 +216,12 @@ class TestJobsViaCelery(TestCaseWithFactory):
     def test_without_rabbitmq(self):
         # If no RabbitMQ broker is configured, the job is not run via Celery.
         self.pushConfig("rabbitmq", broker_urls="none")
-        self.useFixture(
-            FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
-        )
+        self.enableCeleryClass("TestJob")
         with monitor_celery() as responses:
             job = TestJob()
             job.celeryRunOnCommit()
-            job_id = job.job_id
             transaction.commit()
+            job_id = job.job_id
         self.assertEqual([], responses)
         store = IStore(Job)
         dbjob = store.find(Job, id=job_id)[0]
_______________________________________________
Mailing list: https://launchpad.net/~launchpad-reviewers
Post to     : launchpad-reviewers@lists.launchpad.net
Unsubscribe : https://launchpad.net/~launchpad-reviewers
More help   : https://help.launchpad.net/ListHelp

Reply via email to