Colin Watson has proposed merging ~cjwatson/launchpad:stormify-job into launchpad:master.
Commit message: Convert Job to Storm Requested reviews: Launchpad code reviewers (launchpad-reviewers) For more details, see: https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/447422 I had to be a bit more careful with flushing row creations to the database in order to get their IDs back, which required updates to a couple of query count tests that previously only happened to pass because they didn't notice some unflushed queries at the end of the test. -- Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-job into launchpad:master.
diff --git a/lib/lp/charms/model/charmrecipebuildjob.py b/lib/lp/charms/model/charmrecipebuildjob.py index d923485..3534797 100644 --- a/lib/lp/charms/model/charmrecipebuildjob.py +++ b/lib/lp/charms/model/charmrecipebuildjob.py @@ -193,6 +193,7 @@ class CharmhubUploadJob(CharmRecipeBuildJobDerived): ) job = cls(charm_recipe_build_job) job.celeryRunOnCommit() + IStore(CharmRecipeBuildJob).flush() del get_property_cache(build).last_store_upload_job upload_status = build.store_upload_status if upload_status != before_modification.store_upload_status: diff --git a/lib/lp/charms/model/charmrecipejob.py b/lib/lp/charms/model/charmrecipejob.py index 6971b4d..56efa07 100644 --- a/lib/lp/charms/model/charmrecipejob.py +++ b/lib/lp/charms/model/charmrecipejob.py @@ -185,6 +185,7 @@ class CharmRecipeRequestBuildsJob(CharmRecipeJobDerived): recipe_job = CharmRecipeJob(recipe, cls.class_job_type, metadata) job = cls(recipe_job) job.celeryRunOnCommit() + IStore(CharmRecipeJob).flush() return job @classmethod diff --git a/lib/lp/code/model/branchjob.py b/lib/lp/code/model/branchjob.py index e593b2b..c08a8c1 100644 --- a/lib/lp/code/model/branchjob.py +++ b/lib/lp/code/model/branchjob.py @@ -867,6 +867,7 @@ class RosettaUploadJob(BranchJobDerived): ) job = cls(branch_job) job.celeryRunOnCommit() + IStore(BranchJob).flush() return job else: return None @@ -1136,6 +1137,7 @@ class BranchModifiedMailJob(BranchJobDerived): branch_job = BranchJob(branch, cls.class_job_type, metadata) job = cls(branch_job) job.celeryRunOnCommit() + IStore(BranchJob).flush() return job @property diff --git a/lib/lp/code/model/branchmergeproposal.py b/lib/lp/code/model/branchmergeproposal.py index 12df826..04b28b3 100644 --- a/lib/lp/code/model/branchmergeproposal.py +++ b/lib/lp/code/model/branchmergeproposal.py @@ -1105,10 +1105,9 @@ class BranchMergeProposal(StormBase, BugLinkTargetMixin): # or not they have completed. from lp.code.model.branchmergeproposaljob import BranchMergeProposalJob - for job in BranchMergeProposalJob.selectBy( - branch_merge_proposal=self.id - ): - job.destroySelf() + IStore(BranchMergeProposalJob).find( + BranchMergeProposalJob, branch_merge_proposal=self + ).remove() self._preview_diffs.remove() Store.of(self).remove(self) diff --git a/lib/lp/code/model/branchmergeproposaljob.py b/lib/lp/code/model/branchmergeproposaljob.py index e173f29..c4fb588 100644 --- a/lib/lp/code/model/branchmergeproposaljob.py +++ b/lib/lp/code/model/branchmergeproposaljob.py @@ -32,6 +32,7 @@ from storm.store import Store from zope.component import getUtility from zope.interface import implementer, provider +from lp.app.errors import NotFoundError from lp.code.adapters.branch import BranchMergeProposalDelta from lp.code.enums import BranchType from lp.code.errors import BranchHasPendingWrites, UpdatePreviewDiffNotReady @@ -63,7 +64,6 @@ from lp.registry.interfaces.person import IPersonSet from lp.services.config import config from lp.services.database.enumcol import DBEnum from lp.services.database.interfaces import IPrimaryStore, IStore -from lp.services.database.sqlobject import SQLObjectNotFound from lp.services.database.stormbase import StormBase from lp.services.job.interfaces.job import JobStatus from lp.services.job.model.job import EnumeratedSubclass, Job @@ -182,25 +182,14 @@ class BranchMergeProposalJob(StormBase): Store.of(self).remove(self) @classmethod - def selectBy(klass, **kwargs): - """Return selected instances of this class. - - At least one pair of keyword arguments must be supplied. - foo=bar is interpreted as 'select all instances of - BranchMergeProposalJob whose property "foo" is equal to "bar"'. - """ - assert len(kwargs) > 0 - return IStore(klass).find(klass, **kwargs) - - @classmethod def get(klass, key): """Return the instance of this class whose key is supplied. - :raises: SQLObjectNotFound + :raises: NotFoundError """ instance = IStore(klass).get(klass, key) if instance is None: - raise SQLObjectNotFound( + raise NotFoundError( "No occurrence of %s has key %s" % (klass.__name__, key) ) return instance @@ -236,6 +225,7 @@ class BranchMergeProposalJobDerived( base_job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata) job = cls(base_job) job.celeryRunOnCommit() + IStore(BranchMergeProposalJob).flush() return job @classmethod @@ -244,12 +234,12 @@ class BranchMergeProposalJobDerived( :return: the BranchMergeProposalJob with the specified id, as the current BranchMergeProposalJobDereived subclass. - :raises: SQLObjectNotFound if there is no job with the specified id, + :raises: NotFoundError if there is no job with the specified id, or its job_type does not match the desired subclass. """ job = BranchMergeProposalJob.get(job_id) if job.job_type != cls.class_job_type: - raise SQLObjectNotFound( + raise NotFoundError( "No object found with id %d and type %s" % (job_id, cls.class_job_type.title) ) @@ -659,7 +649,7 @@ class BranchMergeProposalJobSource(BaseRunnableJobSource): :return: the BranchMergeProposalJob with the specified id, as the current BranchMergeProposalJobDereived subclass. - :raises: SQLObjectNotFound if there is no job with the specified id, + :raises: NotFoundError if there is no job with the specified id, or its job_type does not match the desired subclass. """ job = BranchMergeProposalJob.get(job_id) diff --git a/lib/lp/code/model/gitjob.py b/lib/lp/code/model/gitjob.py index 1c51b94..f1e9fce 100644 --- a/lib/lp/code/model/gitjob.py +++ b/lib/lp/code/model/gitjob.py @@ -207,6 +207,7 @@ class GitRefScanJob(GitJobDerived): git_job = GitJob(repository, cls.class_job_type, {}) job = cls(git_job) job.celeryRunOnCommit() + IStore(GitJob).flush() return job @staticmethod @@ -292,6 +293,7 @@ class ReclaimGitRepositorySpaceJob(GitJobDerived): ) job = cls(git_job) job.celeryRunOnCommit() + IStore(GitJob).flush() return job @property @@ -393,6 +395,7 @@ class GitRepositoryModifiedMailJob(GitJobDerived): git_job = GitJob(repository, cls.class_job_type, metadata) job = cls(git_job) job.celeryRunOnCommit() + IStore(GitJob).flush() return job @property diff --git a/lib/lp/code/model/tests/test_branchjob.py b/lib/lp/code/model/tests/test_branchjob.py index da70a91..a9610b3 100644 --- a/lib/lp/code/model/tests/test_branchjob.py +++ b/lib/lp/code/model/tests/test_branchjob.py @@ -1022,7 +1022,7 @@ class TestRosettaUploadJob(TestCaseWithFactory): # the two out of any accidental sync by advancing the Job.id # sequence. dummy = Job() - dummy.sync() + IStore(Job).flush() dummy.destroySelf() # Now create the RosettaUploadJob. diff --git a/lib/lp/code/model/tests/test_branchmergeproposal.py b/lib/lp/code/model/tests/test_branchmergeproposal.py index 4181b6e..26780a7 100644 --- a/lib/lp/code/model/tests/test_branchmergeproposal.py +++ b/lib/lp/code/model/tests/test_branchmergeproposal.py @@ -26,6 +26,7 @@ from zope.security.interfaces import Unauthorized from zope.security.proxy import removeSecurityProxy from lp.app.enums import InformationType +from lp.app.errors import NotFoundError from lp.app.interfaces.launchpad import IPrivacy from lp.code.adapters.branch import BranchMergeProposalNoPreviewDiffDelta from lp.code.enums import ( @@ -77,7 +78,6 @@ from lp.registry.interfaces.person import IPersonSet from lp.registry.interfaces.product import IProductSet from lp.services.config import config from lp.services.database.constants import UTC_NOW -from lp.services.database.sqlobject import SQLObjectNotFound from lp.services.job.interfaces.job import JobStatus from lp.services.webapp import canonical_url from lp.services.webhooks.testing import LogsScheduledWebhooks @@ -1862,14 +1862,14 @@ class TestBranchMergeProposalDeletion(TestCaseWithFactory): def test_deleteProposal_deletes_job(self): """Deleting a branch merge proposal deletes all related jobs.""" proposal = self.factory.makeBranchMergeProposal() + store = Store.of(proposal) job = MergeProposalNeedsReviewEmailJob.create(proposal) - job.context.sync() job_id = job.context.id login_person(proposal.registrant) proposal.deleteProposal() - self.assertRaises( - SQLObjectNotFound, BranchMergeProposalJob.get, job_id - ) + store.flush() + store.invalidate() + self.assertRaises(NotFoundError, BranchMergeProposalJob.get, job_id) class TestBranchMergeProposalBugs(WithVCSScenarios, TestCaseWithFactory): diff --git a/lib/lp/code/model/tests/test_branchmergeproposaljobs.py b/lib/lp/code/model/tests/test_branchmergeproposaljobs.py index f8f676a..f1d7889 100644 --- a/lib/lp/code/model/tests/test_branchmergeproposaljobs.py +++ b/lib/lp/code/model/tests/test_branchmergeproposaljobs.py @@ -22,6 +22,7 @@ from testtools.matchers import ( from zope.component import getUtility from zope.security.proxy import removeSecurityProxy +from lp.app.errors import NotFoundError from lp.code.adapters.branch import BranchMergeProposalNoPreviewDiffDelta from lp.code.enums import BranchMergeProposalStatus from lp.code.interfaces.branchmergeproposal import ( @@ -53,7 +54,6 @@ from lp.code.model.branchmergeproposaljob import ( from lp.code.model.tests.test_diff import DiffTestCase from lp.code.subscribers.branchmergeproposal import merge_proposal_modified from lp.services.config import config -from lp.services.database.sqlobject import SQLObjectNotFound from lp.services.features.testing import FeatureFixture from lp.services.job.interfaces.job import JobStatus from lp.services.job.model.job import Job @@ -118,7 +118,7 @@ class TestBranchMergeProposalJobDerived(TestCaseWithFactory): It's an error to call get on BranchMergeProposalJobDerived-- it must be called on a subclass. An object is returned only if the job id and job type match the request. If no suitable object can be found, - SQLObjectNotFound is raised. + NotFoundError is raised. """ bmp = self.factory.makeBranchMergeProposal() job = MergeProposalNeedsReviewEmailJob.create(bmp) @@ -126,9 +126,9 @@ class TestBranchMergeProposalJobDerived(TestCaseWithFactory): self.assertRaises( AttributeError, BranchMergeProposalJobDerived.get, job.id ) - self.assertRaises(SQLObjectNotFound, UpdatePreviewDiffJob.get, job.id) + self.assertRaises(NotFoundError, UpdatePreviewDiffJob.get, job.id) self.assertRaises( - SQLObjectNotFound, MergeProposalNeedsReviewEmailJob.get, job.id + 1 + NotFoundError, MergeProposalNeedsReviewEmailJob.get, job.id + 1 ) self.assertEqual(job, MergeProposalNeedsReviewEmailJob.get(job.id)) diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py index f8d4f3c..2d0ccac 100644 --- a/lib/lp/oci/model/ocirecipebuildjob.py +++ b/lib/lp/oci/model/ocirecipebuildjob.py @@ -213,6 +213,7 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived): ) job = cls(oci_build_job) job.celeryRunOnCommit() + IStore(OCIRecipeBuildJob).flush() del get_property_cache(build).last_registry_upload_job upload_status = build.registry_upload_status if upload_status != before_modification.registry_upload_status: diff --git a/lib/lp/oci/model/ocirecipejob.py b/lib/lp/oci/model/ocirecipejob.py index 735e516..30e9d28 100644 --- a/lib/lp/oci/model/ocirecipejob.py +++ b/lib/lp/oci/model/ocirecipejob.py @@ -174,6 +174,7 @@ class OCIRecipeRequestBuildsJob(OCIRecipeJobDerived): recipe_job = OCIRecipeJob(recipe, cls.class_job_type, metadata) job = cls(recipe_job) job.celeryRunOnCommit() + IStore(OCIRecipeJob).flush() return job @classmethod diff --git a/lib/lp/registry/browser/tests/test_teammembership.py b/lib/lp/registry/browser/tests/test_teammembership.py index 9856bdc..f76f72c 100644 --- a/lib/lp/registry/browser/tests/test_teammembership.py +++ b/lib/lp/registry/browser/tests/test_teammembership.py @@ -32,22 +32,21 @@ class TestTeamMenu(TestCaseWithFactory): # Only these queries should be run, no matter what the # membership tree looks like, although the number of queries # could change slightly if a different user is logged in. - # 1. Check whether the user is the team owner. - # 2. Deactivate the membership in the TeamMembership table. - # 3. Delete from TeamParticipation table. - # (Queries #4, #5, #6, #7, and #10 are run because the storm - # objects have been invalidated.) - # 4. Get the TeamMembership entry. - # 5. Verify that the member exists in the db, but don't load - # the refresh the rest of its data, since we just need the id. - # 6. Verify that the user exists in the db. - # 7. Verify that the team exists in the db. - # 8. Insert into Job table. - # 9. Insert into PersonTransferJob table to schedule sending - # email. (This requires the data from queries #5, #6, and - # #7.) - # 10.Query the rest of the team data for the invalidated - # object in order to generate the canonical url. + # 1. Check whether the user is the team owner. + # 2. Deactivate the membership in the TeamMembership table. + # 3. Delete from TeamParticipation table. + # (Queries #4, #5, #8, and #9 are run because the storm + # objects have been invalidated.) + # 4. Get the TeamMembership entry. + # 5. Verify that the member exists in the db. + # 6. Insert into Job table. + # 7. Insert into SharingJob table to schedule removal of + # subscriptions to artifacts shared with the team. + # 8. Verify that the user exists in the db. + # 9. Verify that the team exists in the db. + # 10. Insert into Job table. + # 11. Insert into PersonTransferJob table to schedule sending + # email. (This requires the data from queries #5, #8, and #9.) self.team.addMember( self.member, self.team.teamowner, force_team_add=True ) @@ -64,4 +63,4 @@ class TestTeamMenu(TestCaseWithFactory): view.processForm() self.assertEqual("", view.errormessage) self.assertEqual(TeamMembershipStatus.DEACTIVATED, membership.status) - self.assertThat(recorder, HasQueryCount(LessThan(11))) + self.assertThat(recorder, HasQueryCount(LessThan(12))) diff --git a/lib/lp/registry/model/persontransferjob.py b/lib/lp/registry/model/persontransferjob.py index 292f256..7c842e7 100644 --- a/lib/lp/registry/model/persontransferjob.py +++ b/lib/lp/registry/model/persontransferjob.py @@ -148,6 +148,7 @@ class PersonTransferJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass): ) derived = cls(job) derived.celeryRunOnCommit() + IStore(PersonTransferJob).flush() return derived @classmethod diff --git a/lib/lp/registry/model/sharingjob.py b/lib/lp/registry/model/sharingjob.py index 3dc412d..9f61dd1 100644 --- a/lib/lp/registry/model/sharingjob.py +++ b/lib/lp/registry/model/sharingjob.py @@ -184,6 +184,7 @@ class SharingJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass): base_job = SharingJob(cls.class_job_type, pillar, grantee, metadata) job = cls(base_job) job.celeryRunOnCommit() + IStore(SharingJob).flush() return job @classmethod diff --git a/lib/lp/registry/tests/test_teammembership.py b/lib/lp/registry/tests/test_teammembership.py index a76ae2f..ff2792a 100644 --- a/lib/lp/registry/tests/test_teammembership.py +++ b/lib/lp/registry/tests/test_teammembership.py @@ -620,7 +620,7 @@ class TestParticipationCleanup(TeamParticipationTestCase): The number of db queries should be constant not O(depth). """ self.assertStatementCount( - 9, + 11, self.team5.setMembershipData, self.no_priv, TeamMembershipStatus.DEACTIVATED, diff --git a/lib/lp/services/job/model/job.py b/lib/lp/services/job/model/job.py index b5ef078..896defc 100644 --- a/lib/lp/services/job/model/job.py +++ b/lib/lp/services/job/model/job.py @@ -18,16 +18,14 @@ from datetime import datetime, timezone import transaction from lazr.jobrunner.jobrunner import LeaseHeld from storm.expr import And, Or, Select -from storm.locals import JSON, Int, Reference +from storm.locals import JSON, DateTime, Int, Reference, Unicode from zope.interface import implementer from lp.services.database import bulk -from lp.services.database.constants import UTC_NOW -from lp.services.database.datetimecol import UtcDateTimeCol +from lp.services.database.constants import DEFAULT, UTC_NOW from lp.services.database.enumcol import DBEnum from lp.services.database.interfaces import IStore -from lp.services.database.sqlbase import SQLBase -from lp.services.database.sqlobject import StringCol +from lp.services.database.stormbase import StormBase from lp.services.job.interfaces.job import IJob, JobStatus, JobType @@ -43,24 +41,28 @@ class InvalidTransition(Exception): @implementer(IJob) -class Job(SQLBase): +class Job(StormBase): """See `IJob`.""" + __storm_table__ = "Job" + @property def job_id(self): return self.id - scheduled_start = UtcDateTimeCol() + id = Int(primary=True) + + scheduled_start = DateTime(tzinfo=timezone.utc) - date_created = UtcDateTimeCol() + date_created = DateTime(tzinfo=timezone.utc) - date_started = UtcDateTimeCol() + date_started = DateTime(tzinfo=timezone.utc) - date_finished = UtcDateTimeCol() + date_finished = DateTime(tzinfo=timezone.utc) - lease_expires = UtcDateTimeCol() + lease_expires = DateTime(tzinfo=timezone.utc) - log = StringCol() + log = Unicode() _status = DBEnum( enum=JobStatus, @@ -106,6 +108,28 @@ class Job(SQLBase): status = property(lambda x: x._status) + def __init__( + self, + scheduled_start=None, + date_finished=None, + lease_expires=None, + max_retries=DEFAULT, + requester=None, + base_json_data=None, + base_job_type=None, + status=JobStatus.WAITING, + ): + super().__init__() + self.scheduled_start = scheduled_start + self.date_finished = date_finished + self.lease_expires = lease_expires + self.max_retries = max_retries + self.requester = requester + self.base_json_data = base_json_data + self.base_job_type = base_job_type + self._status = status + IStore(Job).add(self) + @property def is_pending(self): """See `IJob`.""" @@ -217,6 +241,9 @@ class Job(SQLBase): self._set_status(JobStatus.WAITING) self.lease_expires = None + def destroySelf(self): + IStore(Job).remove(self) + class EnumeratedSubclass(type): """Metaclass for when subclasses are assigned enums.""" diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py index 2490962..70bf2b6 100644 --- a/lib/lp/services/job/tests/test_celery.py +++ b/lib/lp/services/job/tests/test_celery.py @@ -45,6 +45,7 @@ class TestJob(BaseRunnableJob): self.job = store.find(Job, id=job_id)[0] else: self.job = Job(max_retries=2, scheduled_start=scheduled_start) + IStore(Job).flush() def run(self): pass diff --git a/lib/lp/services/job/tests/test_job.py b/lib/lp/services/job/tests/test_job.py index 799ad98..886e5ad 100644 --- a/lib/lp/services/job/tests/test_job.py +++ b/lib/lp/services/job/tests/test_job.py @@ -100,17 +100,17 @@ class TestJob(TestCaseWithFactory): def test_start_when_completed_is_invalid(self): """When a job is completed, attempting to start is invalid.""" - job = Job(_status=JobStatus.COMPLETED) + job = Job(status=JobStatus.COMPLETED) self.assertRaises(InvalidTransition, job.start) def test_start_when_failed_is_invalid(self): """When a job is failed, attempting to start is invalid.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertRaises(InvalidTransition, job.start) def test_start_when_running_is_invalid(self): """When a job is running, attempting to start is invalid.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertRaises(InvalidTransition, job.start) def test_complete(self): @@ -118,7 +118,7 @@ class TestJob(TestCaseWithFactory): It should set date_finished and set the job status to COMPLETED. """ - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) self.assertEqual(None, job.date_finished) job.complete() self.assertNotEqual(None, job.date_finished) @@ -126,17 +126,17 @@ class TestJob(TestCaseWithFactory): def test_complete_when_waiting_is_invalid(self): """When a job is waiting, attempting to complete is invalid.""" - job = Job(_status=JobStatus.WAITING) + job = Job(status=JobStatus.WAITING) self.assertRaises(InvalidTransition, job.complete) def test_complete_when_completed_is_invalid(self): """When a job is completed, attempting to complete is invalid.""" - job = Job(_status=JobStatus.COMPLETED) + job = Job(status=JobStatus.COMPLETED) self.assertRaises(InvalidTransition, job.complete) def test_complete_when_failed_is_invalid(self): """When a job is failed, attempting to complete is invalid.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertRaises(InvalidTransition, job.complete) def test_fail(self): @@ -144,7 +144,7 @@ class TestJob(TestCaseWithFactory): It should set date_finished and set the job status to FAILED. """ - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) self.assertEqual(None, job.date_finished) job.fail() self.assertNotEqual(None, job.date_finished) @@ -152,17 +152,17 @@ class TestJob(TestCaseWithFactory): def test_fail_when_waiting_is_invalid(self): """When a job is waiting, attempting to fail is invalid.""" - job = Job(_status=JobStatus.WAITING) + job = Job(status=JobStatus.WAITING) self.assertRaises(InvalidTransition, job.fail) def test_fail_when_completed_is_invalid(self): """When a job is completed, attempting to fail is invalid.""" - job = Job(_status=JobStatus.COMPLETED) + job = Job(status=JobStatus.COMPLETED) self.assertRaises(InvalidTransition, job.fail) def test_fail_when_failed_is_invalid(self): """When a job is failed, attempting to fail is invalid.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertRaises(InvalidTransition, job.fail) def test_queue(self): @@ -170,7 +170,7 @@ class TestJob(TestCaseWithFactory): It should set date_finished, and set status to WAITING. """ - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) self.assertEqual(None, job.date_finished) job.queue() self.assertNotEqual(None, job.date_finished) @@ -178,76 +178,76 @@ class TestJob(TestCaseWithFactory): def test_queue_clears_lease_expires(self): """Queueing a job releases its lease.""" - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) job.lease_expires = UTC_NOW job.queue() self.assertIsNone(job.lease_expires) def test_suspend(self): """A job that is in the WAITING state can be suspended.""" - job = Job(_status=JobStatus.WAITING) + job = Job(status=JobStatus.WAITING) job.suspend() self.assertEqual(job.status, JobStatus.SUSPENDED) def test_suspend_when_running(self): """When a job is running, attempting to suspend is valid.""" - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) job.suspend() self.assertEqual(JobStatus.SUSPENDED, job.status) def test_suspend_when_completed(self): """When a job is completed, attempting to suspend is invalid.""" - job = Job(_status=JobStatus.COMPLETED) + job = Job(status=JobStatus.COMPLETED) self.assertRaises(InvalidTransition, job.suspend) def test_suspend_when_failed(self): """When a job is failed, attempting to suspend is invalid.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertRaises(InvalidTransition, job.suspend) def test_resume(self): """A job that is suspended can be resumed.""" - job = Job(_status=JobStatus.SUSPENDED) + job = Job(status=JobStatus.SUSPENDED) job.resume() self.assertEqual(job.status, JobStatus.WAITING) def test_resume_clears_lease_expires(self): """A job that resumes should null out the lease_expires.""" - job = Job(_status=JobStatus.SUSPENDED) + job = Job(status=JobStatus.SUSPENDED) job.lease_expires = UTC_NOW job.resume() self.assertIsNone(job.lease_expires) def test_resume_when_running(self): """When a job is running, attempting to resume is invalid.""" - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) self.assertRaises(InvalidTransition, job.resume) def test_resume_when_completed(self): """When a job is completed, attempting to resume is invalid.""" - job = Job(_status=JobStatus.COMPLETED) + job = Job(status=JobStatus.COMPLETED) self.assertRaises(InvalidTransition, job.resume) def test_resume_when_failed(self): """When a job is failed, attempting to resume is invalid.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertRaises(InvalidTransition, job.resume) def test_is_pending(self): """is_pending is True when the job can possibly complete.""" for status in JobStatus.items: - job = Job(_status=status) + job = Job(status=status) self.assertEqual(status in Job.PENDING_STATUSES, job.is_pending) def test_is_runnable_when_failed(self): """is_runnable is false when the job is not WAITING.""" - job = Job(_status=JobStatus.FAILED) + job = Job(status=JobStatus.FAILED) self.assertFalse(job.is_runnable) def test_is_runnable_when_scheduled_in_future(self): """is_runnable is false when the job is scheduled in the future.""" job = Job( - _status=JobStatus.WAITING, + status=JobStatus.WAITING, scheduled_start=datetime.now(timezone.utc) + timedelta(seconds=60), ) self.assertFalse(job.is_runnable) @@ -255,14 +255,14 @@ class TestJob(TestCaseWithFactory): def test_is_runnable_when_scheduled_in_past(self): """is_runnable is true when the job is scheduled in the past.""" job = Job( - _status=JobStatus.WAITING, + status=JobStatus.WAITING, scheduled_start=datetime.now(timezone.utc) - timedelta(seconds=60), ) self.assertTrue(job.is_runnable) def test_is_runnable_when_not_scheduled(self): """is_runnable is true when no explicit schedule has been requested.""" - job = Job(_status=JobStatus.WAITING) + job = Job(status=JobStatus.WAITING) self.assertTrue(job.is_runnable) def test_start_manages_transactions(self): @@ -387,6 +387,7 @@ class TestReadiness(TestCase): """Job.ready_jobs should include new jobs.""" preexisting = self._sampleData() job = Job() + Store.of(job).flush() self.assertEqual( preexisting + [(job.id,)], list(Store.of(job).execute(Job.ready_jobs)), @@ -395,7 +396,7 @@ class TestReadiness(TestCase): def test_ready_jobs_started(self): """Job.ready_jobs should not jobs that have been started.""" preexisting = self._sampleData() - job = Job(_status=JobStatus.RUNNING) + job = Job(status=JobStatus.RUNNING) self.assertEqual( preexisting, list(Store.of(job).execute(Job.ready_jobs)) ) @@ -405,6 +406,7 @@ class TestReadiness(TestCase): preexisting = self._sampleData() UNIX_EPOCH = datetime.fromtimestamp(0, timezone.utc) job = Job(lease_expires=UNIX_EPOCH) + Store.of(job).flush() self.assertEqual( preexisting + [(job.id,)], list(Store.of(job).execute(Job.ready_jobs)), diff --git a/lib/lp/services/job/tests/test_runner.py b/lib/lp/services/job/tests/test_runner.py index f443435..889f941 100644 --- a/lib/lp/services/job/tests/test_runner.py +++ b/lib/lp/services/job/tests/test_runner.py @@ -57,6 +57,7 @@ class NullJob(BaseRunnableJob): ): self.message = completion_message self.job = Job() + IStore(Job).flush() self.oops_recipients = oops_recipients if self.oops_recipients is None: self.oops_recipients = [] @@ -512,6 +513,7 @@ class DerivedJob(BaseRunnableJob, StormBase): super().__init__() self.job = Job() self.should_succeed = should_succeed + IStore(Job).flush() def run(self): if not self.should_succeed: @@ -619,6 +621,7 @@ class StuckJob(StaticJobSource): self.lease_length = lease_length self.delay = delay self.job = Job() + IStore(Job).flush() def __repr__(self): return "<%s(%r, lease_length=%s, delay=%s)>" % ( @@ -655,6 +658,7 @@ class InitialFailureJob(StaticJobSource): def __init__(self, id, fail): self.id = id self.job = Job() + IStore(Job).flush() self.fail = fail def run(self): @@ -677,6 +681,7 @@ class ProcessSharingJob(StaticJobSource): def __init__(self, id, first): self.id = id self.job = Job() + IStore(Job).flush() self.first = first def run(self): @@ -697,6 +702,7 @@ class MemoryHogJob(StaticJobSource): def __init__(self, id): self.job = Job() + IStore(Job).flush() self.id = id def run(self): @@ -717,6 +723,7 @@ class LeaseHeldJob(StaticJobSource): def __init__(self, id): self.job = Job() + IStore(Job).flush() self.id = id def acquireLease(self): diff --git a/lib/lp/services/webhooks/model.py b/lib/lp/services/webhooks/model.py index 902b46e..5db81e2 100644 --- a/lib/lp/services/webhooks/model.py +++ b/lib/lp/services/webhooks/model.py @@ -573,6 +573,7 @@ class WebhookDeliveryJob(WebhookJobDerived): ) job = cls(webhook_job) job.celeryRunOnCommit() + IStore(WebhookJob).flush() log.info( "Scheduled %r (%s): %s" % (job, event_type, _redact_payload(event_type, payload)) diff --git a/lib/lp/snappy/model/snapbuildjob.py b/lib/lp/snappy/model/snapbuildjob.py index b16d2c8..539fb1c 100644 --- a/lib/lp/snappy/model/snapbuildjob.py +++ b/lib/lp/snappy/model/snapbuildjob.py @@ -188,6 +188,7 @@ class SnapStoreUploadJob(SnapBuildJobDerived): snap_build_job = SnapBuildJob(snapbuild, cls.class_job_type, {}) job = cls(snap_build_job) job.celeryRunOnCommit() + IStore(SnapBuildJob).flush() del get_property_cache(snapbuild).last_store_upload_job notify(SnapBuildStoreUploadStatusChangedEvent(snapbuild)) return job diff --git a/lib/lp/snappy/model/snapjob.py b/lib/lp/snappy/model/snapjob.py index 5ec3be9..69f4bb5 100644 --- a/lib/lp/snappy/model/snapjob.py +++ b/lib/lp/snappy/model/snapjob.py @@ -191,6 +191,7 @@ class SnapRequestBuildsJob(SnapJobDerived): snap_job = SnapJob(snap, cls.class_job_type, metadata) job = cls(snap_job) job.celeryRunOnCommit() + IStore(SnapJob).flush() return job @classmethod diff --git a/lib/lp/soyuz/model/initializedistroseriesjob.py b/lib/lp/soyuz/model/initializedistroseriesjob.py index 8c7775e..48a30eb 100644 --- a/lib/lp/soyuz/model/initializedistroseriesjob.py +++ b/lib/lp/soyuz/model/initializedistroseriesjob.py @@ -113,6 +113,7 @@ class InitializeDistroSeriesJob(DistributionJobDerived): store.add(distribution_job) derived_job = cls(distribution_job) derived_job.celeryRunOnCommit() + IStore(DistributionJob).flush() return derived_job @classmethod diff --git a/lib/lp/soyuz/model/processacceptedbugsjob.py b/lib/lp/soyuz/model/processacceptedbugsjob.py index c4a1515..e368155 100644 --- a/lib/lp/soyuz/model/processacceptedbugsjob.py +++ b/lib/lp/soyuz/model/processacceptedbugsjob.py @@ -260,6 +260,7 @@ class ProcessAcceptedBugsJob(StormBase, BaseRunnableJob): distroseries, sourcepackagerelease, bug_ids ) IPrimaryStore(ProcessAcceptedBugsJob).add(job) + IPrimaryStore(ProcessAcceptedBugsJob).flush() job.celeryRunOnCommit() return job diff --git a/lib/lp/translations/model/translationsharingjob.py b/lib/lp/translations/model/translationsharingjob.py index 950143b..d317d2a 100644 --- a/lib/lp/translations/model/translationsharingjob.py +++ b/lib/lp/translations/model/translationsharingjob.py @@ -173,6 +173,7 @@ class TranslationSharingJobDerived(metaclass=EnumeratedSubclass): ) derived = cls(context) derived.celeryRunOnCommit() + IStore(TranslationSharingJob).flush() return derived @classmethod diff --git a/lib/lp/translations/tests/test_pofilestatsjob.py b/lib/lp/translations/tests/test_pofilestatsjob.py index 517949f..c805674 100644 --- a/lib/lp/translations/tests/test_pofilestatsjob.py +++ b/lib/lp/translations/tests/test_pofilestatsjob.py @@ -23,7 +23,7 @@ class TestPOFileStatsJob(TestCaseWithFactory): def test_job_interface(self): # Instances of POFileStatsJob are runnable jobs. - verifyObject(IRunnableJob, POFileStatsJob(0)) + verifyObject(IRunnableJob, POFileStatsJob(self.factory.makePOFile())) def test_source_interface(self): # The POFileStatsJob class is a source of POFileStatsJobs. @@ -38,7 +38,7 @@ class TestPOFileStatsJob(TestCaseWithFactory): self.factory.makePOTMsgSet(pofile.potemplate, singular) # The statistics start at 0. self.assertEqual(pofile.potemplate.messageCount(), 0) - job = pofilestatsjob.schedule(pofile.id) + job = pofilestatsjob.schedule(pofile) # Just scheduling the job doesn't update the statistics. self.assertEqual(pofile.potemplate.messageCount(), 0) with dbuser("pofilestats"): @@ -58,7 +58,7 @@ class TestPOFileStatsJob(TestCaseWithFactory): self.factory.makePOTMsgSet(pofile.potemplate, singular) # The statistics are still at 0, even though there is a message. self.assertEqual(potemplate.messageCount(), 0) - job = pofilestatsjob.schedule(pofile.id) + job = pofilestatsjob.schedule(pofile) # Just scheduling the job doesn't update the statistics. self.assertEqual(pofile.potemplate.messageCount(), 0) with dbuser("pofilestats"): @@ -73,7 +73,7 @@ class TestPOFileStatsJob(TestCaseWithFactory): # We need a POFile to update. pofile = self.factory.makePOFile(side=TranslationSide.UPSTREAM) # If we schedule a job, then we'll get it back. - job = pofilestatsjob.schedule(pofile.id) + job = pofilestatsjob.schedule(pofile) self.assertIs(list(POFileStatsJob.iterReady())[0], job) def test_second_job_is_scheduled(self): @@ -83,11 +83,11 @@ class TestPOFileStatsJob(TestCaseWithFactory): # We need a POFile to update. pofile = self.factory.makePOFile(side=TranslationSide.UPSTREAM) # If we schedule a job, then there will be one scheduled. - pofilestatsjob.schedule(pofile.id) + pofilestatsjob.schedule(pofile) self.assertIs(len(list(POFileStatsJob.iterReady())), 1) # If we attempt to schedule another job for the same POFile, a new job # is added. - pofilestatsjob.schedule(pofile.id) + pofilestatsjob.schedule(pofile) self.assertIs(len(list(POFileStatsJob.iterReady())), 2) def assertJobUpdatesStats(self, pofile1, pofile2): @@ -97,7 +97,7 @@ class TestPOFileStatsJob(TestCaseWithFactory): # The statistics start at 0. self.assertEqual(pofile1.getStatistics(), (0, 0, 0, 0)) self.assertEqual(pofile2.getStatistics(), (0, 0, 0, 0)) - job = pofilestatsjob.schedule(pofile1.id) + job = pofilestatsjob.schedule(pofile1) # Just scheduling the job doesn't update the statistics. self.assertEqual(pofile1.getStatistics(), (0, 0, 0, 0)) self.assertEqual(pofile2.getStatistics(), (0, 0, 0, 0)) @@ -207,7 +207,7 @@ class TestViaCelery(TestCaseWithFactory): self.factory.makePOTMsgSet(pofile.potemplate, singular) # The statistics start at 0. self.assertEqual(pofile.potemplate.messageCount(), 0) - pofilestatsjob.schedule(pofile.id) + pofilestatsjob.schedule(pofile) with block_on_job(): transaction.commit() # Now that the job ran, the statistics have been updated.
_______________________________________________ Mailing list: https://launchpad.net/~launchpad-reviewers Post to : launchpad-reviewers@lists.launchpad.net Unsubscribe : https://launchpad.net/~launchpad-reviewers More help : https://help.launchpad.net/ListHelp