jenkins-bot has submitted this change and it was merged.
Change subject: [JobQueue] Added support for delayed jobs with JobQueueRedis.
......................................................................
[JobQueue] Added support for delayed jobs with JobQueueRedis.
* The queue can handle delaying jobs until a given timestamp is reached.
* Added Job::getReleaseTimestamp() to let jobs specifiy delay amounts.
* Added a "checkDelay" option and a supportsDelayedJobs() function to JobQueue.
There are also getDelayedCount() and getAllDelayedJobs() functions.
* Simplified a bit of code in doBatchPush() and pushBlobs().
* Improved the logic in redisEval().
Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2
---
M includes/job/Job.php
M includes/job/JobQueue.php
M includes/job/JobQueueRedis.php
3 files changed, 247 insertions(+), 73 deletions(-)
Approvals:
Tim Starling: Looks good to me, approved
jenkins-bot: Verified
diff --git a/includes/job/Job.php b/includes/job/Job.php
index bcf582e..d8f55c3 100644
--- a/includes/job/Job.php
+++ b/includes/job/Job.php
@@ -177,6 +177,16 @@
}
/**
+ * @return integer|null UNIX timestamp to delay running this job until,
otherwise null
+ * @since 1.22
+ */
+ public function getReleaseTimestamp() {
+ return isset( $this->params['jobReleaseTimestamp'] )
+ ? wfTimestampOrNull( TS_UNIX,
$this->params['jobReleaseTimestamp'] )
+ : null;
+ }
+
+ /**
* @return bool Whether only one of each identical set of jobs should
be run
*/
public function ignoreDuplicates() {
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index 5ef52b5..9c152cd 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -34,6 +34,7 @@
protected $order; // string; job priority for pop()
protected $claimTTL; // integer; seconds
protected $maxTries; // integer; maximum number of times to try a job
+ protected $checkDelay; // boolean; allow delayed jobs
const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
@@ -55,28 +56,36 @@
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support
'{$this->order}' order." );
}
+ $this->checkDelay = !empty( $params['checkDelay'] );
+ if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
+ throw new MWException( __CLASS__ . " does not support
delayed jobs." );
+ }
}
/**
* Get a job queue object of the specified type.
* $params includes:
- * - class : What job class to use (determines job type)
- * - wiki : wiki ID of the wiki the jobs are for (defaults to
current wiki)
- * - type : The name of the job types this queue handles
- * - order : Order that pop() selects jobs, one of "fifo",
"timestamp" or "random".
- * If "fifo" is used, the queue will effectively be
FIFO. Note that
- * job completion will not appear to be exactly FIFO if
there are multiple
- * job runners since jobs can take different times to
finish once popped.
- * If "timestamp" is used, the queue will at least be
loosely ordered
- * by timestamp, allowing for some jobs to be popped off
out of order.
- * If "random" is used, pop() will pick jobs in random
order.
- * Note that it may only be weakly random (e.g. a
lottery of the oldest X).
- * If "any" is choosen, the queue will use whatever
order is the fastest.
- * This might be useful for improving concurrency for
job acquisition.
- * - claimTTL : If supported, the queue will recycle jobs that have
been popped
- * but not acknowledged as completed after this many
seconds. Recycling
- * of jobs simple means re-inserting them into the
queue. Jobs can be
- * attempted up to three times before being discarded.
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to
current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo",
"timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be
FIFO. Note that job
+ * completion will not appear to be exactly FIFO if
there are multiple
+ * job runners since jobs can take different times to
finish once popped.
+ * If "timestamp" is used, the queue will at least be
loosely ordered
+ * by timestamp, allowing for some jobs to be popped
off out of order.
+ * If "random" is used, pop() will pick jobs in random
order.
+ * Note that it may only be weakly random (e.g. a
lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever
order is the fastest.
+ * This might be useful for improving concurrency for
job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have
been popped
+ * but not acknowledged as completed after this many
seconds. Recycling
+ * of jobs simple means re-inserting them into the
queue. Jobs can be
+ * attempted up to three times before being discarded.
+ * - checkDelay : If supported, respect Job::getReleaseTimestamp() in
the push functions.
+ * This lets delayed jobs wait in a staging area until
a given timestamp is
+ * reached, at which point they will enter the queue.
If this is not enabled
+ * or not supported, an exception will be thrown on
delayed job insertion.
*
* Queue classes should throw an exception if they do not support the
options given.
*
@@ -128,7 +137,14 @@
abstract protected function optimalOrder();
/**
- * Quickly check if the queue is empty (has no available jobs).
+ * @return boolean Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return false; // not implemented
+ }
+
+ /**
+ * Quickly check if the queue has no available (unacquired,
non-delayed) jobs.
* Queue classes should use caching if they are any slower without
memcached.
*
* If caching is used, this might return false when there are actually
no jobs.
@@ -153,7 +169,7 @@
abstract protected function doIsEmpty();
/**
- * Get the number of available (unacquired) jobs in the queue.
+ * Get the number of available (unacquired, non-delayed) jobs in the
queue.
* Queue classes should use caching if they are any slower without
memcached.
*
* If caching is used, this number might be out of date for a minute.
@@ -197,6 +213,31 @@
abstract protected function doGetAcquiredCount();
/**
+ * Get the number of delayed jobs (these are temporarily out of the
queue).
+ * Queue classes should use caching if they are any slower without
memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getDelayedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetDelayedCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return integer
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
+
+ /**
* Push a single jobs into the queue.
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this
function.
@@ -227,7 +268,11 @@
foreach ( $jobs as $job ) {
if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}'
job; expected '{$this->type}'." );
+ throw new MWException(
+ "Got '{$job->getType()}' job; expected
a '{$this->type}' job." );
+ } elseif ( $job->getReleaseTimestamp() &&
!$this->checkDelay ) {
+ throw new MWException(
+ "Got delayed '{$job->getType()}' job;
delays are not supported." );
}
}
@@ -493,9 +538,9 @@
protected function doFlushCaches() {}
/**
- * Get an iterator to traverse over all of the jobs in this queue.
- * This does not include jobs that are current acquired. In general,
- * this should only be called on a queue that is no longer being popped.
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * This should only be called on a queue that is no longer being popped.
*
* @return Iterator|Traversable|Array
* @throws MWException
@@ -503,6 +548,18 @@
abstract public function getAllQueuedJobs();
/**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * This should only be called on a queue that is no longer being popped.
+ *
+ * @return Iterator|Traversable|Array
+ * @throws MWException
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return array(); // not implemented
+ }
+
+ /**
* Namespace the queue with a key to isolate it for testing
*
* @param $key string
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 338dc79..bd23174 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -27,22 +27,23 @@
* This is faster, less resource intensive, queue that JobQueueDB.
* All data for a queue using this class is placed into one redis server.
*
- * There are seven main redis keys used to track jobs:
- * - l-unclaimed : A list of job IDs used for push/pop
+ * There are eight main redis keys used to track jobs:
+ * - l-unclaimed : A list of job IDs used for ready unclaimed jobs
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used
for job retries
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used
for broken jobs
+ * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used
for delayed jobs
* - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for
de-duplication
* - h-sha1Byid : A hash of (job ID => SHA1) for unclaimed jobs used for
de-duplication
* - h-attempts : A hash of (job ID => attempt count) used for job
claiming/retries
* - h-data : A hash of (job ID => serialized blobs) for job storage
- * Any given job ID can be in only one of l-unclaimed, z-claimed, and
z-abandoned.
+ * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and
z-abandoned.
* If an ID appears in any of those lists, it should have a h-data entry for
its ID.
- * If a job has a non-empty SHA1 de-duplication value and its ID is in
l-unclaimed,
- * then there should be no other such jobs. Every h-idBySha1 entry has an
h-sha1Byid
+ * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or
z-delayed, then
+ * there should be no other such jobs with that SHA1. Every h-idBySha1 entry
has an h-sha1Byid
* entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a
job has its
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry
for its ID.
*
- * Additionally, "rootjob:* keys to track "root jobs" used for additional
de-duplication.
+ * Additionally, "rootjob:* keys track "root jobs" used for additional
de-duplication.
* Aside from root job keys, all keys have no expiry, and are only removed
when jobs are run.
* All the keys are prefixed with the relevant wiki ID information.
*
@@ -89,6 +90,10 @@
return 'fifo';
}
+ protected function supportsDelayedJobs() {
+ return true;
+ }
+
/**
* @see JobQueue::doIsEmpty()
* @return bool
@@ -133,6 +138,23 @@
}
/**
+ * @see JobQueue::doGetDelayedCount()
+ * @return integer
+ * @throws MWException
+ */
+ protected function doGetDelayedCount() {
+ if ( !$this->checkDelay ) {
+ return 0; // no delayed jobs
+ }
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-delayed' )
);
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
+ /**
* @see JobQueue::doBatchPush()
* @param array $jobs
* @param $flags
@@ -150,13 +172,8 @@
$items[$item['uuid']] = $item;
}
}
- // Convert the field maps into serialized blobs
- $tuples = array();
- foreach ( $items as $item ) {
- $tuples[] = array( $item['uuid'], $item['sha1'],
serialize( $item ) );
- }
- if ( !count( $tuples ) ) {
+ if ( !count( $items ) ) {
return true; // nothing to do
}
@@ -164,26 +181,26 @@
try {
// Actually push the non-duplicate jobs into the
queue...
if ( $flags & self::QoS_Atomic ) {
- $batches = array( $tuples ); // all or nothing
+ $batches = array( $items ); // all or nothing
} else {
- $batches = array_chunk( $tuples, 500 ); //
avoid tying up the server
+ $batches = array_chunk( $items, 500 ); // avoid
tying up the server
}
$failed = 0;
$pushed = 0;
- foreach ( $batches as $tupleBatch ) {
- $added = $this->pushBlobs( $conn, $tupleBatch );
+ foreach ( $batches as $itemBatch ) {
+ $added = $this->pushBlobs( $conn, $itemBatch );
if ( is_int( $added ) ) {
$pushed += $added;
} else {
- $failed += count( $tupleBatch );
+ $failed += count( $itemBatch );
}
}
if ( $failed > 0 ) {
wfDebugLog( 'JobQueueRedis', "Could not insert
{$failed} {$this->type} job(s)." );
return false;
}
- wfIncrStats( 'job-insert', count( $tuples ) );
- wfIncrStats( 'job-insert-duplicate', count( $tuples ) -
$failed - $pushed );
+ wfIncrStats( 'job-insert', count( $items ) );
+ wfIncrStats( 'job-insert-duplicate', count( $items ) -
$failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
@@ -193,30 +210,37 @@
/**
* @param RedisConnRef $conn
- * @param array $tuples List of tuples of (job ID, job SHA1 or '',
serialized blob)
+ * @param array $items List of results from
JobQueueRedis::getNewJobFields()
* @return integer Number of jobs inserted (duplicates are ignored)
* @throws RedisException
*/
- protected function pushBlobs( RedisConnRef $conn, array $tuples ) {
+ protected function pushBlobs( RedisConnRef $conn, array $items ) {
$args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
- foreach ( $tuples as $tuple ) {
- $args[] = $tuple[0]; // id
- $args[] = $tuple[1]; // sha1
- $args[] = $tuple[2]; // blob
+ foreach ( $items as $item ) {
+ $args[] = (string)$item['uuid'];
+ $args[] = (string)$item['sha1'];
+ $args[] = (string)$item['rtimestamp'];
+ $args[] = (string)serialize( $item );
}
static $script =
<<<LUA
- if #ARGV % 3 ~= 0 then return redis.error_reply('Unmatched
arguments') end
+ if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched
arguments') end
local pushed = 0
- for i = 1,#ARGV,3 do
- local id,sha1,blob = ARGV[i],ARGV[i+1],ARGV[i+2]
+ for i = 1,#ARGV,4 do
+ local id,sha1,rtimestamp,blob =
ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
if sha1 == '' or redis.call('hExists',KEYS[3],sha1) ==
0 then
- redis.call('lPush',KEYS[1],id)
+ if 1*rtimestamp > 0 then
+ -- Insert into delayed queue (release
time as score)
+ redis.call('zAdd',KEYS[4],rtimestamp,id)
+ else
+ -- Insert into unclaimed queue
+ redis.call('lPush',KEYS[1],id)
+ end
if sha1 ~= '' then
redis.call('hSet',KEYS[2],id,sha1)
redis.call('hSet',KEYS[3],sha1,id)
end
- redis.call('hSet',KEYS[4],id,blob)
+ redis.call('hSet',KEYS[5],id,blob)
pushed = pushed + 1
end
end
@@ -228,11 +252,12 @@
$this->getQueueKey( 'l-unclaimed' ), #
KEYS[1]
$this->getQueueKey( 'h-sha1ById' ), #
KEYS[2]
$this->getQueueKey( 'h-idBySha1' ), #
KEYS[3]
- $this->getQueueKey( 'h-data' ), #
KEYS[4]
+ $this->getQueueKey( 'z-delayed' ), #
KEYS[4]
+ $this->getQueueKey( 'h-data' ), #
KEYS[5]
),
$args
),
- 4 # number of first argument(s) that are keys
+ 5 # number of first argument(s) that are keys
);
}
@@ -243,6 +268,12 @@
*/
protected function doPop() {
$job = false;
+
+ // Push ready delayed jobs into the queue every 10 jobs to
spread the load.
+ // This is also done as a periodic task, but we don't want too
much done at once.
+ if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
+ $this->releaseReadyDelayedJobs();
+ }
$conn = $this->getConnection();
try {
@@ -463,7 +494,29 @@
}
/**
- * This function should not be called outside RedisJobQueue
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllDelayedJobs() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ throw new MWException( "Unable to connect to redis
server." );
+ }
+ try {
+ $that = $this;
+ return new MappedIterator( // delayed jobs
+ $conn->zRange( $this->getQueueKey( 'z-delayed'
), 0, -1 ),
+ function( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal(
$uid, $conn );
+ }
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
+ /**
+ * This function should not be called outside JobQueueRedis
*
* @param $uid string
* @param $conn RedisConnRef
@@ -483,6 +536,43 @@
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
+ }
+
+ /**
+ * Release any ready delayed jobs into the queue
+ *
+ * @return integer Number of jobs released
+ * @throws MWException
+ */
+ public function releaseReadyDelayedJobs() {
+ $count = 0;
+
+ $conn = $this->getConnection();
+ try {
+ static $script =
+<<<LUA
+ -- Get the list of ready delayed jobs, sorted by
readiness
+ local ids =
redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+ -- Migrate the jobs from the "delayed" set to the
"unclaimed" list
+ for k,id in ipairs(ids) do
+ redis.call('lPush',KEYS[2],id)
+ redis.call('zRem',KEYS[1],id)
+ end
+ return #ids
+LUA;
+ $count += (int)$this->redisEval( $conn, $script,
+ array(
+ $this->getQueueKey( 'z-delayed' ), //
KEYS[1]
+ $this->getQueueKey( 'l-unclaimed' ), //
KEYS[2]
+ time() // ARGV[1]; max "delay until"
UNIX timestamp
+ ),
+ 2 # first two arguments are keys
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+
+ return $count;
}
/**
@@ -564,16 +654,20 @@
* @return Array
*/
protected function doGetPeriodicTasks() {
+ $tasks = array();
if ( $this->claimTTL > 0 ) {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this,
'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2
)
- )
+ $tasks['recycleAndDeleteStaleJobs'] = array(
+ 'callback' => array( $this,
'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
);
- } else {
- return array();
}
+ if ( $this->checkDelay ) {
+ $tasks['releaseReadyDelayedJobs'] = array(
+ 'callback' => array( $this,
'releaseReadyDelayedJobs' ),
+ 'period' => 300 // 5 minutes
+ );
+ }
+ return $tasks;
}
/**
@@ -584,11 +678,22 @@
* @return mixed
*/
protected function redisEval( RedisConnRef $conn, $script, array
$params, $numKeys ) {
- $res = $conn->evalSha( sha1( $script ), $params, $numKeys );
- if ( $res === false && $conn->getLastError() != '' ) { // not
in script cache?
- wfDebugLog( 'JobQueueRedis', "Lua script error: " .
$conn->getLastError() );
+ $sha1 = sha1( $script ); // 40 char hex
+
+ // Try to run the server-side cached copy of the script
+ $conn->clearLastError();
+ $res = $conn->evalSha( $sha1, $params, $numKeys );
+ // If the script is not in cache, use eval() to retry and cache
it
+ if ( $conn->getLastError() && $conn->script( 'exists', $sha1 )
=== array( 0 ) ) {
+ $conn->clearLastError();
$res = $conn->eval( $script, $params, $numKeys );
+ wfDebugLog( 'JobQueueRedis', "Used eval() for Lua
script $sha1." );
}
+
+ if ( $conn->getLastError() ) { // script bug?
+ wfDebugLog( 'JobQueueRedis', "Lua script error: " .
$conn->getLastError() );
+ }
+
return $res;
}
@@ -599,16 +704,18 @@
protected function getNewJobFields( Job $job ) {
return array(
// Fields that describe the nature of the job
- 'type' => $job->getType(),
- 'namespace' => $job->getTitle()->getNamespace(),
- 'title' => $job->getTitle()->getDBkey(),
- 'params' => $job->getParams(),
+ 'type' => $job->getType(),
+ 'namespace' => $job->getTitle()->getNamespace(),
+ 'title' => $job->getTitle()->getDBkey(),
+ 'params' => $job->getParams(),
+ // Some jobs cannot run until a "release timestamp"
+ 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
// Additional job metadata
- 'uuid' => UIDGenerator::newRawUUIDv4(
UIDGenerator::QUICK_RAND ),
- 'sha1' => $job->ignoreDuplicates()
+ 'uuid' => UIDGenerator::newRawUUIDv4(
UIDGenerator::QUICK_RAND ),
+ 'sha1' => $job->ignoreDuplicates()
? wfBaseConvert( sha1( serialize(
$job->getDeduplicationInfo() ) ), 16, 36, 31 )
: '',
- 'timestamp' => time() // UNIX timestamp
+ 'timestamp' => time() // UNIX timestamp
);
}
--
To view, visit https://gerrit.wikimedia.org/r/53315
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2
Gerrit-PatchSet: 7
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Lwelling <[email protected]>
Gerrit-Reviewer: Tim Starling <[email protected]>
Gerrit-Reviewer: Tychay <[email protected]>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits