Aaron Schulz has uploaded a new change for review.
https://gerrit.wikimedia.org/r/53315
Change subject: [JobQueue] Added support for delayed jobs with JobQueueRedis.
......................................................................
[JobQueue] Added support for delayed jobs with JobQueueRedis.
Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2
---
M includes/job/Job.php
M includes/job/JobQueue.php
M includes/job/JobQueueRedis.php
3 files changed, 123 insertions(+), 17 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core
refs/changes/15/53315/1
diff --git a/includes/job/Job.php b/includes/job/Job.php
index 0f42611..4b54f91 100644
--- a/includes/job/Job.php
+++ b/includes/job/Job.php
@@ -177,6 +177,15 @@
}
/**
+ * @return integer|null UNIX timestamp to delay running this job until,
otherwise null
+ */
+ public function delayUntilTimestamp() {
+ return isset( $this->params['delayUntilTimestamp'] )
+ ? wfTimestampOrNull( TS_UNIX,
$this->params['delayUntilTimestamp'] )
+ : null;
+ }
+
+ /**
* @return bool Whether only one of each identical set of jobs should
be run
*/
public function ignoreDuplicates() {
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index b0dd925..f9b9454 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -126,7 +126,7 @@
abstract protected function optimalOrder();
/**
- * Quickly check if the queue is empty (has no available jobs).
+ * Quickly check if the queue has no available (unacquired,
non-delayed) jobs.
* Queue classes should use caching if they are any slower without
memcached.
*
* If caching is used, this might return false when there are actually
no jobs.
@@ -151,7 +151,7 @@
abstract protected function doIsEmpty();
/**
- * Get the number of available (unacquired) jobs in the queue.
+ * Get the number of available (unacquired, non-delayed) jobs in the
queue.
* Queue classes should use caching if they are any slower without
memcached.
*
* If caching is used, this number might be out of date for a minute.
@@ -193,6 +193,30 @@
* @return integer
*/
abstract protected function doGetAcquiredCount();
+
+ /**
+ * Get the number of delayed jobs (these are temporarily out of the
queue).
+ * Queue classes should use caching if they are any slower without
memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ */
+ final public function getDelayedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetDelayedCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return integer
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
/**
* Push a single jobs into the queue.
@@ -413,7 +437,7 @@
protected function doFlushCaches() {}
/**
- * Get an iterator to traverse over all of the jobs in this queue.
+ * Get an iterator to traverse over all available jobs in this queue.
* This does not include jobs that are current acquired. In general,
* this should only be called on a queue that is no longer being popped.
*
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 3db8260..4746520 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -106,6 +106,20 @@
}
/**
+ * @see JobQueue::doGetDelayedCount()
+ * @return integer
+ * @throws MWException
+ */
+ protected function doGetDelayedCount() {
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-delayed' )
);
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
+ /**
* @see JobQueue::doBatchPush()
* @param array $jobs
* @param $flags
@@ -157,13 +171,30 @@
}
// Actually push the non-duplicate jobs into the
queue...
if ( count( $items ) ) {
- $uids = array_keys( $items );
+ $lParams = array(); // list push params
+ $zParams = array(); // sorted-set add params
+ foreach ( $items as $item ) {
+ if ( $item['delayUntil'] > 0 ) { // use
sorted-set
+ $zParams[] =
$item['delayUntil']; // score
+ $zParams[] = $item['uid']; //
value
+ } else { // use list
+ $lParams[] = $item['uid'];
+ }
+ }
$conn->multi( Redis::MULTI ); // begin (atomic
trx)
$conn->mSet( $this->prefixKeysWithQueueKey(
'data', $items ) );
- call_user_func_array(
- array( $conn, 'lPush' ),
- array_merge( array( $this->getQueueKey(
'l-unclaimed' ) ), $uids )
- );
+ if ( count( $lParams ) ) {
+ call_user_func_array(
+ array( $conn, 'lPush' ),
+ array_merge( array(
$this->getQueueKey( 'l-unclaimed' ) ), $lParams )
+ );
+ }
+ if ( count( $zParams ) ) {
+ call_user_func_array(
+ array( $conn, 'zAdd' ),
+ array_merge( array(
$this->getQueueKey( 'z-delayed' ) ), $zParams )
+ );
+ }
$res = $conn->exec(); // commit (atomic trx)
if ( in_array( false, $res, true ) ) {
wfDebugLog( 'JobQueueRedis', "Could not
insert {$this->type} job(s)." );
@@ -187,8 +218,11 @@
protected function doPop() {
$job = false;
- if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
- $this->cleanupClaimedJobs(); // prune jobs and IDs from
the "garbage" list
+ if ( mt_rand( 0, 99 ) == 0 ) {
+ $this->releaseReadyDelayedJobs(); // push ready delayed
jobs into the queue
+ if ( $this->claimTTL <= 0 ) { // no acknowledgements
+ $this->cleanupClaimedJobs(); // prune jobs and
IDs from the "garbage" list
+ }
}
$conn = $this->getConnection();
@@ -360,7 +394,7 @@
}
/**
- * This function should not be called outside RedisJobQueue
+ * This function should not be called outside JobQueueRedis
*
* @param $uid string
* @param $conn RedisConnRef
@@ -476,6 +510,39 @@
}
/**
+ * Release any ready delayed jobs into the queue
+ *
+ * @return integer Number of jobs released
+ * @throws MWException
+ */
+ public function releaseReadyDelayedJobs() {
+ $count = 0;
+ // Move ready jobs from the "delayed" set to the "unclaimed"
list
+ $conn = $this->getConnection();
+ try {
+ // Get the list of ready delayed jobs
+ $uids = $conn->zRangeByScore(
+ $this->getQueueKey( 'z-delayed' ), 0, time(),
array( 'limit' => 500 )
+ );
+ $count = count( $uids );
+ if ( $count ) {
+ // Migrate the jobs to the "unclaimed" list
+ $conn->multi( Redis::MULTI ); // begin (atomic
trx)
+ call_user_func_array(
+ array( $conn, 'lPush' ),
+ array_merge( array( $this->getQueueKey(
'l-unclaimed' ) ), $uids )
+ );
+ $conn->zRem( $this->getQueueKey( 'z-delayed' ),
$uids );
+ $conn->exec(); // commit (atomic trx)
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+
+ return $count;
+ }
+
+ /**
* Destroy any jobs that have been claimed
*
* @return integer Number of jobs deleted
@@ -528,6 +595,10 @@
protected function doGetPeriodicTasks() {
if ( $this->claimTTL > 0 ) {
return array(
+ 'releaseReadyDelayedJobs' => array(
+ 'callback' => array( $this,
'releaseReadyDelayedJobs' ),
+ 'period' => 300 // 5 minutes
+ ),
'recycleAndDeleteStaleJobs' => array(
'callback' => array( $this,
'recycleAndDeleteStaleJobs' ),
'period' => ceil( $this->claimTTL / 2
)
@@ -545,15 +616,17 @@
protected function getNewJobFields( Job $job ) {
return array(
// Fields that describe the nature of the job
- 'type' => $job->getType(),
- 'namespace' => $job->getTitle()->getNamespace(),
- 'title' => $job->getTitle()->getDBkey(),
- 'params' => $job->getParams(),
+ 'type' => $job->getType(),
+ 'namespace' => $job->getTitle()->getNamespace(),
+ 'title' => $job->getTitle()->getDBkey(),
+ 'params' => $job->getParams(),
+ // Some jobs cannot run until a certain timestamp
+ 'delayUntil' => $job->delayUntilTimestamp() ?: null,
// Additional metadata
- 'uid' => $job->ignoreDuplicates()
+ 'uid' => $job->ignoreDuplicates()
? wfBaseConvert( sha1( serialize(
$job->getDeduplicationInfo() ) ), 16, 36, 31 )
: wfRandomString( 32 ),
- 'timestamp' => time() // UNIX timestamp
+ 'timestamp' => time() // UNIX timestamp
);
}
--
To view, visit https://gerrit.wikimedia.org/r/53315
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits