Aaron Schulz has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/53315


Change subject: [JobQueue] Added support for delayed jobs with JobQueueRedis.
......................................................................

[JobQueue] Added support for delayed jobs with JobQueueRedis.

Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2
---
M includes/job/Job.php
M includes/job/JobQueue.php
M includes/job/JobQueueRedis.php
3 files changed, 123 insertions(+), 17 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core 
refs/changes/15/53315/1

diff --git a/includes/job/Job.php b/includes/job/Job.php
index 0f42611..4b54f91 100644
--- a/includes/job/Job.php
+++ b/includes/job/Job.php
@@ -177,6 +177,15 @@
        }
 
        /**
+        * @return integer|null UNIX timestamp to delay running this job until, 
otherwise null
+        */
+       public function delayUntilTimestamp() {
+               return isset( $this->params['delayUntilTimestamp'] )
+                       ? wfTimestampOrNull( TS_UNIX, 
$this->params['delayUntilTimestamp'] )
+                       : null;
+       }
+
+       /**
         * @return bool Whether only one of each identical set of jobs should 
be run
         */
        public function ignoreDuplicates() {
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index b0dd925..f9b9454 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -126,7 +126,7 @@
        abstract protected function optimalOrder();
 
        /**
-        * Quickly check if the queue is empty (has no available jobs).
+        * Quickly check if the queue has no available (unacquired, 
non-delayed) jobs.
         * Queue classes should use caching if they are any slower without 
memcached.
         *
         * If caching is used, this might return false when there are actually 
no jobs.
@@ -151,7 +151,7 @@
        abstract protected function doIsEmpty();
 
        /**
-        * Get the number of available (unacquired) jobs in the queue.
+        * Get the number of available (unacquired, non-delayed) jobs in the 
queue.
         * Queue classes should use caching if they are any slower without 
memcached.
         *
         * If caching is used, this number might be out of date for a minute.
@@ -193,6 +193,30 @@
         * @return integer
         */
        abstract protected function doGetAcquiredCount();
+
+       /**
+        * Get the number of delayed jobs (these are temporarily out of the 
queue).
+        * Queue classes should use caching if they are any slower without 
memcached.
+        *
+        * If caching is used, this number might be out of date for a minute.
+        *
+        * @return integer
+        * @throws MWException
+        */
+       final public function getDelayedCount() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doGetDelayedCount();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueue::getDelayedCount()
+        * @return integer
+        */
+       protected function doGetDelayedCount() {
+               return 0; // not implemented
+       }
 
        /**
         * Push a single jobs into the queue.
@@ -413,7 +437,7 @@
        protected function doFlushCaches() {}
 
        /**
-        * Get an iterator to traverse over all of the jobs in this queue.
+        * Get an iterator to traverse over all available jobs in this queue.
         * This does not include jobs that are current acquired. In general,
         * this should only be called on a queue that is no longer being popped.
         *
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 3db8260..4746520 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -106,6 +106,20 @@
        }
 
        /**
+        * @see JobQueue::doGetDelayedCount()
+        * @return integer
+        * @throws MWException
+        */
+       protected function doGetDelayedCount() {
+               $conn = $this->getConnection();
+               try {
+                       return $conn->zSize( $this->getQueueKey( 'z-delayed' ) 
);
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
         * @see JobQueue::doBatchPush()
         * @param array $jobs
         * @param $flags
@@ -157,13 +171,30 @@
                        }
                        // Actually push the non-duplicate jobs into the 
queue...
                        if ( count( $items ) ) {
-                               $uids = array_keys( $items );
+                               $lParams = array(); // list push params
+                               $zParams = array(); // sorted-set add params
+                               foreach ( $items as $item ) {
+                                       if ( $item['delayUntil'] > 0 ) { // use 
sorted-set
+                                               $zParams[] = 
$item['delayUntil']; // score
+                                               $zParams[] = $item['uid']; // 
value
+                                       } else { // use list
+                                               $lParams[] = $item['uid'];
+                                       }
+                               }
                                $conn->multi( Redis::MULTI ); // begin (atomic 
trx)
                                $conn->mSet( $this->prefixKeysWithQueueKey( 
'data', $items ) );
-                               call_user_func_array(
-                                       array( $conn, 'lPush' ),
-                                       array_merge( array( $this->getQueueKey( 
'l-unclaimed' ) ), $uids )
-                               );
+                               if ( count( $lParams ) ) {
+                                       call_user_func_array(
+                                               array( $conn, 'lPush' ),
+                                               array_merge( array( 
$this->getQueueKey( 'l-unclaimed' ) ), $lParams )
+                                       );
+                               }
+                               if ( count( $zParams ) ) {
+                                       call_user_func_array(
+                                               array( $conn, 'zAdd' ),
+                                               array_merge( array( 
$this->getQueueKey( 'z-delayed' ) ), $zParams )
+                                       );
+                               }
                                $res = $conn->exec(); // commit (atomic trx)
                                if ( in_array( false, $res, true ) ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not 
insert {$this->type} job(s)." );
@@ -187,8 +218,11 @@
        protected function doPop() {
                $job = false;
 
-               if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
-                       $this->cleanupClaimedJobs(); // prune jobs and IDs from 
the "garbage" list
+               if ( mt_rand( 0, 99 ) == 0 ) {
+                       $this->releaseReadyDelayedJobs(); // push ready delayed 
jobs into the queue
+                       if ( $this->claimTTL <= 0 ) { // no acknowledgements
+                               $this->cleanupClaimedJobs(); // prune jobs and 
IDs from the "garbage" list
+                       }
                }
 
                $conn = $this->getConnection();
@@ -360,7 +394,7 @@
        }
 
        /**
-        * This function should not be called outside RedisJobQueue
+        * This function should not be called outside JobQueueRedis
         *
         * @param $uid string
         * @param $conn RedisConnRef
@@ -476,6 +510,39 @@
        }
 
        /**
+        * Release any ready delayed jobs into the queue
+        *
+        * @return integer Number of jobs released
+        * @throws MWException
+        */
+       public function releaseReadyDelayedJobs() {
+               $count = 0;
+               // Move ready jobs from the "delayed" set to the "unclaimed" 
list
+               $conn = $this->getConnection();
+               try {
+                       // Get the list of ready delayed jobs
+                       $uids = $conn->zRangeByScore(
+                               $this->getQueueKey( 'z-delayed' ), 0, time(), 
array( 'limit' => 500 )
+                       );
+                       $count = count( $uids );
+                       if ( $count ) {
+                               // Migrate the jobs to the "unclaimed" list
+                               $conn->multi( Redis::MULTI ); // begin (atomic 
trx)
+                               call_user_func_array(
+                                       array( $conn, 'lPush' ),
+                                       array_merge( array( $this->getQueueKey( 
'l-unclaimed' ) ), $uids )
+                               );
+                               $conn->zRem( $this->getQueueKey( 'z-delayed' ), 
$uids );
+                               $conn->exec(); // commit (atomic trx)
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               return $count;
+       }
+
+       /**
         * Destroy any jobs that have been claimed
         *
         * @return integer Number of jobs deleted
@@ -528,6 +595,10 @@
        protected function doGetPeriodicTasks() {
                if ( $this->claimTTL > 0 ) {
                        return array(
+                               'releaseReadyDelayedJobs' => array(
+                                       'callback' => array( $this, 
'releaseReadyDelayedJobs' ),
+                                       'period'   => 300 // 5 minutes
+                               ),
                                'recycleAndDeleteStaleJobs' => array(
                                        'callback' => array( $this, 
'recycleAndDeleteStaleJobs' ),
                                        'period'   => ceil( $this->claimTTL / 2 
)
@@ -545,15 +616,17 @@
        protected function getNewJobFields( Job $job ) {
                return array(
                        // Fields that describe the nature of the job
-                       'type'      => $job->getType(),
-                       'namespace' => $job->getTitle()->getNamespace(),
-                       'title'     => $job->getTitle()->getDBkey(),
-                       'params'    => $job->getParams(),
+                       'type'       => $job->getType(),
+                       'namespace'  => $job->getTitle()->getNamespace(),
+                       'title'      => $job->getTitle()->getDBkey(),
+                       'params'     => $job->getParams(),
+                       // Some jobs cannot run until a certain timestamp
+                       'delayUntil' => $job->delayUntilTimestamp() ?: null,
                        // Additional metadata
-                       'uid'       => $job->ignoreDuplicates()
+                       'uid'        => $job->ignoreDuplicates()
                                ? wfBaseConvert( sha1( serialize( 
$job->getDeduplicationInfo() ) ), 16, 36, 31 )
                                : wfRandomString( 32 ),
-                       'timestamp' => time() // UNIX timestamp
+                       'timestamp'  => time() // UNIX timestamp
                );
        }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/53315
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to