Lwelling has uploaded a new change for review. https://gerrit.wikimedia.org/r/52601
Change subject: Allow job queues to optionally request jobs not be processed immediately Jobs can be given either a notBefore timestamp, or a delay from now in seconds ...................................................................... Allow job queues to optionally request jobs not be processed immediately Jobs can be given either a notBefore timestamp, or a delay from now in seconds Most jobs don't need this, so unless the queue specifically has delays enabled, the not_before column is not joined so performance of normal queues should be unaffected Commit includes DB patches and unit tests for the new job behavior Change-Id: I05df4efe7c82b8785b47b5e384879bdf6e4edb7b --- M includes/installer/MysqlUpdater.php M includes/installer/PostgresUpdater.php M includes/installer/SqliteUpdater.php M includes/job/Job.php M includes/job/JobQueue.php M includes/job/JobQueueDB.php A maintenance/archives/patch-delayed-job-queue.sql M maintenance/postgres/tables.sql M maintenance/tables.sql M tests/phpunit/includes/jobqueue/JobQueueTest.php 10 files changed, 204 insertions(+), 15 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core refs/changes/01/52601/1 diff --git a/includes/installer/MysqlUpdater.php b/includes/installer/MysqlUpdater.php index 6a2d50f..a2a8075 100644 --- a/includes/installer/MysqlUpdater.php +++ b/includes/installer/MysqlUpdater.php @@ -229,6 +229,8 @@ array( 'modifyField', 'user_groups', 'ug_group', 'patch-ug_group-length-increase-255.sql' ), array( 'modifyField', 'user_former_groups', 'ufg_group', 'patch-ufg_group-length-increase-255.sql' ), array( 'addIndex', 'page_props', 'pp_propname_page', 'patch-page_props-propname-page-index.sql' ), + array( 'addField', 'job', 'job_not_before', 'patch-delayed-job-queue.sql' ), + array( 'addIndex', 'job', 'job_not_before', 'patch-delayed-job-queue.sql' ), ); } diff --git a/includes/installer/PostgresUpdater.php b/includes/installer/PostgresUpdater.php index 0a4b5e6..ebec1b5 100644 --- a/includes/installer/PostgresUpdater.php +++ b/includes/installer/PostgresUpdater.php @@ -160,6 +160,7 @@ array( 'addPgField', 'job', 'job_token', "TEXT NOT NULL DEFAULT ''" ), array( 'addPgField', 'job', 'job_token_timestamp', "TIMESTAMPTZ" ), array( 'addPgField', 'job', 'job_sha1', "TEXT NOT NULL DEFAULT ''" ), + array( 'addPgField', 'job', 'job_not_before', "TIMESTAMPTZ DEFAULT NULL" ), # type changes array( 'changeField', 'archive', 'ar_deleted', 'smallint', '' ), @@ -238,6 +239,7 @@ array( 'addPgIndex', 'job', 'job_cmd_token', '(job_cmd, job_token, job_random)' ), array( 'addPgIndex', 'job', 'job_cmd_token_id', '(job_cmd, job_token, job_id)' ), array( 'addPgIndex', 'filearchive', 'fa_sha1', '(fa_sha1)' ), + array( 'addPgIndex', 'job', 'job_not_before_idx', '(job_not_before)' ), array( 'checkIndex', 'pagelink_unique', array( array( 'pl_from', 'int4_ops', 'btree', 0 ), diff --git a/includes/installer/SqliteUpdater.php b/includes/installer/SqliteUpdater.php index cd7a2c9..2ebb490 100644 --- a/includes/installer/SqliteUpdater.php +++ b/includes/installer/SqliteUpdater.php @@ -109,6 +109,9 @@ array( 'modifyField', 'user_groups', 'ug_group', 'patch-ug_group-length-increase-255.sql' ), array( 'modifyField', 'user_former_groups', 'ufg_group', 'patch-ufg_group-length-increase-255.sql' ), array( 'addIndex', 'page_props', 'pp_propname_page', 'patch-page_props-propname-page-index.sql' ), + + array( 'addField', 'job', 'job_not_before', 'patch-delayed-job-queue.sql' ), + array( 'addIndex', 'job', 'job_not_before', 'patch-delayed-job-queue.sql' ), ); } diff --git a/includes/job/Job.php b/includes/job/Job.php index 9ec58c9..4be70bf 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -170,6 +170,23 @@ } /** + * @return null or integer + * notBefore is a timestamp in seconds + * It can be set directly or calculated from the current timestamp and a 'delay' (also in seconds) + */ + public function getNotBefore() { + if( isset( $this->params['notBefore'] ) && + intval( $this->params['notBefore'] ) > 0 ) { + return intval( $this->params['notBefore'] ); + } else if ( isset( $this->params['delay'] ) && + intval( $this->params['delay'] ) > 0 ) { + return time() + intval( $this->params['delay'] ); + } else { + return null; + } + } + + /** * @return array */ public function getParams() { @@ -207,6 +224,10 @@ unset( $info['params']['rootJobSignature'] ); unset( $info['params']['rootJobTimestamp'] ); } + // Identical jobs with different not before times should count as duplicates + if ( is_array( $info['params'] ) ) { + unset( $info['params']['notBefore'] ); + } return $info; } diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 4996a9e..185ae0f 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -34,7 +34,7 @@ protected $order; // string; job priority for pop() protected $claimTTL; // integer; seconds protected $maxTries; // integer; maximum number of times to try a job - + protected $delay; // boolean support deliberately delayed jobs with a minimum start time const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions /** @@ -46,6 +46,7 @@ $this->order = isset( $params['order'] ) ? $params['order'] : 'random'; $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; + $this->delay = isset( $params['delay'] ) ? ( ( bool ) ( $params['delay'] ) ) : false; } /** @@ -68,6 +69,12 @@ * but not acknowledged as completed after this many seconds. Recycling * of jobs simple means re-inserting them into the queue. Jobs can be * attempted up to three times before being discarded. + * - delay : Request support for a minimum start time for jobs + * so a job can be queued with "do not process before <timestamp>" logic + * In practice few jobs want delay so by default don't look for or honor delay requests + * so query performance is not harmed for the 99% of other jobs + * Not all queue types will support delays so may be silently ignored + * To use, pass a notBefore or delay integer in each Job's params array * * Queue classes should throw an exception if they do not support the options given. * diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index fd64895..788bd0d 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -19,6 +19,7 @@ * * @file * @author Aaron Schulz + * @author Luke Welling */ /** @@ -351,6 +352,13 @@ // same table being changed in an UPDATE query in MySQL (gives Error: 1093). // Oracle and Postgre have no such limitation. However, MySQL offers an // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. + $delayQuery = ''; + if ( $this->delay === true ) { + $now = time(); + $delayQuery = "AND ( + job_not_before IS NULL OR + job_not_before <= {$dbw->addQuotes( $now )} ) "; + } $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . "SET " . "job_token = {$dbw->addQuotes( $uuid ) }, " . @@ -359,12 +367,22 @@ "WHERE ( " . "job_cmd = {$dbw->addQuotes( $this->type )} " . "AND job_token = {$dbw->addQuotes( '' )} " . + $delayQuery . ") ORDER BY job_id ASC LIMIT 1", __METHOD__ ); } else { // Use a subquery to find the job, within an UPDATE to claim it. // This uses as much of the DB wrapper functions as possible. + $whereConds = array( 'job_cmd' => $this->type, 'job_token' => '' ); + + if ( $this->delay ) { + $now = time(); + $whereConds += array( "( + job_not_before IS NULL OR + job_not_before <= {$dbw->addQuotes( $now )} ) "); + } + $dbw->update( 'job', array( 'job_token' => $uuid, @@ -372,7 +390,7 @@ 'job_attempts = job_attempts+1' ), array( 'job_id = (' . $dbw->selectSQLText( 'job', 'job_id', - array( 'job_cmd' => $this->type, 'job_token' => '' ), + $whereConds, __METHOD__, array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . ')' @@ -619,18 +637,19 @@ list( $dbw, $scope ) = $this->getMasterDB(); return array( // Fields that describe the nature of the job - 'job_cmd' => $job->getType(), - 'job_namespace' => $job->getTitle()->getNamespace(), - 'job_title' => $job->getTitle()->getDBkey(), - 'job_params' => self::makeBlob( $job->getParams() ), + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), // Additional job metadata - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_timestamp' => $dbw->timestamp(), - 'job_sha1' => wfBaseConvert( + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ), - 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ), + 'job_not_before' => $job->getNotBefore() ); } diff --git a/maintenance/archives/patch-delayed-job-queue.sql b/maintenance/archives/patch-delayed-job-queue.sql new file mode 100644 index 0000000..266f594 --- /dev/null +++ b/maintenance/archives/patch-delayed-job-queue.sql @@ -0,0 +1,11 @@ +-- +-- patch-delayed-queue.sql +-- Adds a column to job to allow jobs to be deliberately delayed on a job queue +-- 2013-2-27 +-- + +ALTER TABLE /*$wgDBprefix*/job + ADD COLUMN `job_not_before` varbinary(14) DEFAULT NULL; + +ALTER TABLE /*$wgDBprefix*/job + ADD INDEX `job_not_before` (`job_not_before`); diff --git a/maintenance/postgres/tables.sql b/maintenance/postgres/tables.sql index 9cbabfd..6eaae62 100644 --- a/maintenance/postgres/tables.sql +++ b/maintenance/postgres/tables.sql @@ -535,12 +535,14 @@ job_token TEXT NOT NULL DEFAULT '', job_token_timestamp TIMESTAMPTZ, job_sha1 TEXT NOT NULL DEFAULT '' + job_not_before TIMESTAMPTZ ); CREATE INDEX job_sha1 ON job (job_sha1); CREATE INDEX job_cmd_token ON job (job_cmd, job_token, job_random); CREATE INDEX job_cmd_token_id ON job (job_cmd, job_token, job_id); CREATE INDEX job_cmd_namespace_title ON job (job_cmd, job_namespace, job_title); CREATE INDEX job_timestamp_idx ON job (job_timestamp); +CREATE INDEX job_not_before_idx ON job (job_not_before); -- Tsearch2 2 stuff. Will fail if we don't have proper access to the tsearch2 tables -- Version 8.3 or higher only. Previous versions would need another parmeter for to_tsvector. diff --git a/maintenance/tables.sql b/maintenance/tables.sql index 97d6ff2..949c303 100644 --- a/maintenance/tables.sql +++ b/maintenance/tables.sql @@ -1307,7 +1307,12 @@ job_token_timestamp varbinary(14) NULL default NULL, -- Base 36 SHA1 of the job parameters relevant to detecting duplicates - job_sha1 varbinary(32) NOT NULL default '' + job_sha1 varbinary(32) NOT NULL default '', + + -- Timestamp of the earliest time the job is requested to be executed + -- NULL for jobs that can be perfomed immediately + -- NULL for all jobs in queues that don't support delay + job_not_before varbinary(14) NULL default NULL ) /*$wgDBTableOptions*/; CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1); diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php b/tests/phpunit/includes/jobqueue/JobQueueTest.php index 453cec3..b6c5fbb 100644 --- a/tests/phpunit/includes/jobqueue/JobQueueTest.php +++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php @@ -7,7 +7,7 @@ */ class JobQueueTest extends MediaWikiTestCase { protected $key; - protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL; + protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL, $queueDelay; protected $old = array(); function __construct( $name = null, array $data = array(), $dataName = '' ) { @@ -40,8 +40,11 @@ array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig ); $this->queueFifoTTL = JobQueue::factory( array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig ); + $this->queueDelay = JobQueue::factory( + array( 'order' => 'fifo', 'delay' => true ) + $baseConfig ); + if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables - foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) { + foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL', 'queueDelay' ) as $q ) { $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) ); } } @@ -50,7 +53,7 @@ protected function tearDown() { global $wgMemc; parent::tearDown(); - foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) { + foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL', 'queueDelay' ) as $q ) { do { $job = $this->$q->pop(); if ( $job ) { @@ -62,6 +65,7 @@ $this->queueRandTTL = null; $this->queueFifo = null; $this->queueFifoTTL = null; + $this->queueDelay = null; $wgMemc = $this->old['wgMemc']; } @@ -132,6 +136,15 @@ $queue->flushCaches(); $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); + + $queue->flushCaches(); + $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); + $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" ); + + $this->assertTrue( $queue->push( $this->newDelayedJob() ), "Push of delayed job worked regardless of support($desc)" ); + + $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); + $queue->flushCaches(); } /** @@ -262,6 +275,91 @@ $queue->flushCaches(); $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); + + //Test that delays have no effect on a non-delayed queue + $this->assertTrue( $queue->push( $this->newDelayedJob( 1, array(), 1 ) ), "Push worked ($desc)" ); + $job = $queue->pop(); + $this->assertTrue( $job instanceof Job, "Immediate pop should succeed with delay request ignored ($desc)" ); + $queue->ack( $job ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 2, array(), 0 ) ), "Push worked ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 3, array(), 999 ) ), "Push worked ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 4, array(), 0 ) ), "Push worked ($desc)" ); + + // pop order should be FIFO - 2, 3, 4 + for ( $i = 2; $i < 5 ; ++$i ) { + $job = $queue->pop(); + $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" ); + $params = $job->getParams(); + $this->assertEquals( $i, $params['i'], "Job with ignored delay $i popped from queue in position $i ($desc)" ); + $queue->ack( $job ); + } + } + + /** + * @dataProvider provider_delayedQueueLists + * Note these tests rely on magic numbers and timing so are susceptible to spurious failures + */ + function testDelayedJobOrder( $queue, $recycles, $desc ) { + $queue = $this->$queue; + + $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); + + $queue->flushCaches(); + $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); + $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" ); + + for ( $i = 0; $i < 10; ++$i ) { + $this->assertTrue( $queue->push( $this->newDelayedJob( $i, array(), 0 ) ), "Push worked ($desc)" ); + } + + for ( $i = 0; $i < 10; ++$i ) { + $job = $queue->pop(); + $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" ); + $params = $job->getParams(); + $this->assertEquals( $i, $params['i'], "Job with zero delay popped from queue is FIFO ($desc)" ); + $queue->ack( $job ); + } + + $this->assertFalse( $queue->pop(), "Queue is empty ($desc)" ); + + // test setting delay param + $this->assertTrue( $queue->push( $this->newDelayedJob( 1, array(), 1 ) ), "Push worked ($desc)" ); + $this->assertFalse( $queue->pop(), "Immediate pop should fail with delay ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 2, array(), 0 ) ), "Push worked ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 3, array(), 999 ) ), "Push worked ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 4, array(), 0 ) ), "Push worked ($desc)" ); + + // after waiting more than a second, pop order should be 1,2,4,false + $correct = array( 1, 2, 4 ); + sleep(2); + for ( $i = 0; $i < 3 ; ++$i ) { + $job = $queue->pop(); + $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" ); + $params = $job->getParams(); + $this->assertEquals( $correct[$i], $params['i'], "Job with delay {$correct[$i]} popped from queue in position $i ($desc)" ); + $queue->ack( $job ); + } + $this->assertFalse( $queue->pop(), "Long delayed item should not be popable during this test ($desc)" ); + + + // test setting notBefore param + $this->assertTrue( $queue->push( $this->newDelayedJob( 1, array(), 0, time() + 1 ) ), "Push worked ($desc)" ); + $this->assertFalse( $queue->pop(), "Immediate pop should fail with notBefore ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 2, array(), 0, time() + 0 ) ), "Push worked ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 3, array(), 0, time() + 999 ) ), "Push worked ($desc)" ); + $this->assertTrue( $queue->push( $this->newDelayedJob( 4, array(), 0, time() + 0 ) ), "Push worked ($desc)" ); + + // after waiting more than a second, pop order should be 1,2,4,false + $correct = array( 1, 2, 4 ); + sleep(2); + for ( $i = 0; $i < 3 ; ++$i ) { + $job = $queue->pop(); + $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" ); + $params = $job->getParams(); + $this->assertEquals( $correct[$i], $params['i'], "Job with delay {$correct[$i]} popped from queue in position $i ($desc)" ); + $queue->ack( $job ); + } + $this->assertFalse( $queue->pop(), "Long delayed item should not be popable during this test ($desc)" ); } function provider_queueLists() { @@ -269,7 +367,8 @@ array( 'queueRand', 'rand', false, 'Random queue without ack()' ), array( 'queueRandTTL', 'rand', true, 'Random queue with ack()' ), array( 'queueFifo', 'fifo', false, 'Ordered queue without ack()' ), - array( 'queueFifoTTL', 'fifo', true, 'Ordered queue with ack()' ) + array( 'queueFifoTTL', 'fifo', true, 'Ordered queue with ack()' ), + array( 'queueDelay', 'fifo', false, 'Ordered queue with delayed start times' ) ); } @@ -277,6 +376,12 @@ return array( array( 'queueFifo', false, 'Ordered queue without ack()' ), array( 'queueFifoTTL', true, 'Ordered queue with ack()' ) + ); + } + + function provider_delayedQueueLists() { + return array( + array( 'queueDelay', 'fifo', false, 'Ordered queue with delayed start times' ) ); } @@ -289,4 +394,16 @@ return new NullJob( Title::newMainPage(), array( 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ) + $rootJob ); } + + function newDelayedJob( $i = 0, $rootJob = array(), $delay = 0, $notBefore = 0 ) { + $params = array( 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ) + $rootJob; + if( $delay !== 0 ) { + $params['delay'] = $delay; + } + if( $notBefore !== 0 ) { + $params['notBefore'] = $notBefore; + } + return new NullJob( Title::newMainPage(), $params ); + } + } -- To view, visit https://gerrit.wikimedia.org/r/52601 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I05df4efe7c82b8785b47b5e384879bdf6e4edb7b Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/core Gerrit-Branch: master Gerrit-Owner: Lwelling <lwell...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits