Tim Starling has submitted this change and it was merged.

Change subject: [JobQueue] Added aggregate empty/non-empty queue caching.
......................................................................


[JobQueue] Added aggregate empty/non-empty queue caching.

* The default class is JobQueueAggregatorMemc.
  This essentially has the logic that nextJobDB.php used.
* Also created a JobQueueAggregatorRedis class.
  This is much more efficient and more responsive.
* This can speed up calls to getQueuesWithJobs().
* Removed unused getDefaultQueuesWithJobs() function.

Change-Id: Ifb3c6c881decd643da1b662956ded69db4b39431
---
M includes/AutoLoader.php
M includes/DefaultSettings.php
M includes/job/JobQueue.php
A includes/job/JobQueueAggregator.php
A includes/job/JobQueueAggregatorMemc.php
A includes/job/JobQueueAggregatorRedis.php
M includes/job/JobQueueDB.php
M includes/job/JobQueueGroup.php
M maintenance/nextJobDB.php
M maintenance/runJobs.php
10 files changed, 488 insertions(+), 120 deletions(-)

Approvals:
  Tim Starling: Verified; Looks good to me, approved



diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php
index 23cf411..e0b7c8f 100644
--- a/includes/AutoLoader.php
+++ b/includes/AutoLoader.php
@@ -661,6 +661,9 @@
        # includes/job
        'Job' => 'includes/job/Job.php',
        'JobQueue' => 'includes/job/JobQueue.php',
+       'JobQueueAggregator' => 'includes/job/JobQueueAggregator.php',
+       'JobQueueAggregatorMemc' => 'includes/job/JobQueueAggregatorMemc.php',
+       'JobQueueAggregatorRedis' => 'includes/job/JobQueueAggregatorRedis.php',
        'JobQueueDB' => 'includes/job/JobQueueDB.php',
        'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
        'JobQueueRedis' => 'includes/job/JobQueueRedis.php',
diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php
index 173f31e..1652031 100644
--- a/includes/DefaultSettings.php
+++ b/includes/DefaultSettings.php
@@ -5537,6 +5537,14 @@
 );
 
 /**
+ * Which aggregator to use for tracking which queues have jobs.
+ * These settings should be global to all wikis.
+ */
+$wgJobQueueAggregator = array(
+       'class' => 'JobQueueAggregatorMemc'
+);
+
+/**
  * Additional functions to be performed with updateSpecialPages.
  * Expensive Querypages are already updated.
  */
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index 7ce654b..acc0c49 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -169,9 +169,7 @@
         * @throws MWException
         */
        final public function push( $jobs, $flags = 0 ) {
-               $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
-
-               return $this->batchPush( $jobs, $flags );
+               return $this->batchPush( is_array( $jobs ) ? $jobs : array( 
$jobs ), $flags );
        }
 
        /**
@@ -184,11 +182,15 @@
         * @throws MWException
         */
        final public function batchPush( array $jobs, $flags = 0 ) {
+               if ( !count( $jobs ) ) {
+                       return true; // nothing to do
+               }
                foreach ( $jobs as $job ) {
                        if ( $job->getType() !== $this->type ) {
                                throw new MWException( "Got '{$job->getType()}' 
job; expected '{$this->type}'." );
                        }
                }
+
                wfProfileIn( __METHOD__ );
                $ok = $this->doBatchPush( $jobs, $flags );
                wfProfileOut( __METHOD__ );
@@ -205,7 +207,7 @@
         * Pop a job off of the queue.
         * This requires $wgJobClasses to be set for the given job type.
         *
-        * @return Job|bool Returns false on failure
+        * @return Job|bool Returns false if there are no jobs
         * @throws MWException
         */
        final public function pop() {
diff --git a/includes/job/JobQueueAggregator.php 
b/includes/job/JobQueueAggregator.php
new file mode 100644
index 0000000..3dba3c5
--- /dev/null
+++ b/includes/job/JobQueueAggregator.php
@@ -0,0 +1,139 @@
+<?php
+/**
+ * Job queue aggregator code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueueAggregator {
+       /** @var JobQueueAggregator */
+       protected static $instance = null;
+
+       /**
+        * @param array $params
+        */
+       protected function __construct( array $params ) {}
+
+       /**
+        * @return JobQueueAggregator
+        */
+       final public static function singleton() {
+               global $wgJobQueueAggregator;
+
+               if ( !isset( self::$instance ) ) {
+                       $class = $wgJobQueueAggregator['class'];
+                       $obj = new $class( $wgJobQueueAggregator );
+                       if ( !( $obj instanceof JobQueueAggregator ) ) {
+                               throw new MWException( "Class '$class' is not a 
JobQueueAggregator class." );
+                       }
+                       self::$instance = $obj;
+               }
+
+               return self::$instance;
+       }
+
+       /**
+        * Destroy the singleton instance
+        *
+        * @return void
+        */
+       final public static function destroySingleton() {
+               self::$instance = null;
+       }
+
+       /**
+        * Mark a queue as being empty
+        *
+        * @param string $wiki
+        * @param string $type
+        * @return bool Success
+        */
+       final public function notifyQueueEmpty( $wiki, $type ) {
+               wfProfileIn( __METHOD__ );
+               $ok = $this->doNotifyQueueEmpty( $wiki, $type );
+               wfProfileOut( __METHOD__ );
+               return $ok;
+       }
+
+       /**
+        * @see JobQueueAggregator::notifyQueueEmpty()
+        */
+       abstract protected function doNotifyQueueEmpty( $wiki, $type );
+
+       /**
+        * Mark a queue as being non-empty
+        *
+        * @param string $wiki
+        * @param string $type
+        * @return bool Success
+        */
+       final public function notifyQueueNonEmpty( $wiki, $type ) {
+               wfProfileIn( __METHOD__ );
+               $ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
+               wfProfileOut( __METHOD__ );
+               return $ok;
+       }
+
+       /**
+        * @see JobQueueAggregator::notifyQueueNonEmpty()
+        */
+       abstract protected function doNotifyQueueNonEmpty( $wiki, $type );
+
+       /**
+        * Get the list of all of the queues with jobs
+        *
+        * @return Array (job type => (list of wiki IDs))
+        */
+       final public function getAllReadyWikiQueues() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doGetAllReadyWikiQueues();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueueAggregator::getAllReadyWikiQueues()
+        */
+       abstract protected function doGetAllReadyWikiQueues();
+
+       /**
+        * Get all databases that have a pending job.
+        * This poll all the queues and is this expensive.
+        *
+        * @return Array (job type => (list of wiki IDs))
+        */
+       protected function findPendingWikiQueues() {
+               global $wgLocalDatabases;
+
+               $pendingDBs = array(); // (job type => (db list))
+               foreach ( $wgLocalDatabases as $db ) {
+                       foreach ( JobQueueGroup::singleton( $db 
)->getQueuesWithJobs() as $type ) {
+                               $pendingDBs[$type][] = $db;
+                       }
+               }
+
+               return $pendingDBs;
+       }
+}
diff --git a/includes/job/JobQueueAggregatorMemc.php 
b/includes/job/JobQueueAggregatorMemc.php
new file mode 100644
index 0000000..4b82cf9
--- /dev/null
+++ b/includes/job/JobQueueAggregatorMemc.php
@@ -0,0 +1,117 @@
+<?php
+/**
+ * Job queue aggregator code that uses BagOStuff.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues using BagOStuff
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueAggregatorMemc extends JobQueueAggregator {
+       /** @var BagOStuff */
+       protected $cache;
+
+       protected $cacheTTL; // integer; seconds
+
+       /**
+        * @params include:
+        *   - objectCache : Name of an object cache registered in 
$wgObjectCaches.
+        *                   This defaults to the one specified by 
$wgMainCacheType.
+        *   - cacheTTL    : Seconds to cache the aggregate data before 
regenerating.
+        * @param array $params
+        */
+       protected function __construct( array $params ) {
+               parent::__construct( $params );
+               $this->cache = isset( $params['objectCache'] )
+                       ? wfGetCache( $params['objectCache'] )
+                       : wfGetMainCache();
+               $this->cacheTTL = isset( $params['cacheTTL'] ) ? 
$params['cacheTTL'] : 180; // 3 min
+       }
+
+       /**
+        * @see JobQueueAggregator::doNotifyQueueEmpty()
+        */
+       protected function doNotifyQueueEmpty( $wiki, $type ) {
+               $key = $this->getReadyQueueCacheKey();
+               // Delist the queue from the "ready queue" list
+               if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
+                       $curInfo = $this->cache->get( $key );
+                       if ( is_array( $curInfo ) && isset( 
$curInfo['pendingDBs'][$type] ) ) {
+                               if ( in_array( $wiki, 
$curInfo['pendingDBs'][$type] ) ) {
+                                       $curInfo['pendingDBs'][$type] = 
array_diff(
+                                               $curInfo['pendingDBs'][$type], 
array( $wiki ) );
+                                       $this->cache->set( $key, $curInfo );
+                               }
+                       }
+                       $this->cache->delete( "$key:lock" ); // unlock
+               }
+               return true;
+       }
+
+       /**
+        * @see JobQueueAggregator::doNotifyQueueNonEmpty()
+        */
+       protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+               return true; // updated periodically
+       }
+
+       /**
+        * @see JobQueueAggregator::doAllGetReadyWikiQueues()
+        */
+       protected function doGetAllReadyWikiQueues() {
+               $key = $this->getReadyQueueCacheKey();
+               // If the cache entry wasn't present, is stale, or in .1% of 
cases otherwise,
+               // regenerate the cache. Use any available stale cache if 
another process is
+               // currently regenerating the pending DB information.
+               $pendingDbInfo = $this->cache->get( $key );
+               if ( !is_array( $pendingDbInfo )
+                       || ( time() - $pendingDbInfo['timestamp'] ) > 
$this->cacheTTL
+                       || mt_rand( 0, 999 ) == 0
+               ) {
+                       if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { 
// lock
+                               $pendingDbInfo = array(
+                                       'pendingDBs' => 
$this->findPendingWikiQueues(),
+                                       'timestamp'  => time()
+                               );
+                               for ( $attempts=1; $attempts <= 25; ++$attempts 
) {
+                                       if ( $this->cache->add( "$key:lock", 1, 
60 ) ) { // lock
+                                               $this->cache->set( $key, 
$pendingDbInfo );
+                                               $this->cache->delete( 
"$key:lock" ); // unlock
+                                               break;
+                                       }
+                               }
+                               $this->cache->delete( "$key:rebuild" ); // 
unlock
+                       }
+               }
+               return is_array( $pendingDbInfo )
+                       ? $pendingDbInfo['pendingDBs']
+                       : array(); // cache is both empty and locked
+       }
+
+       /**
+        * @return string
+        */
+       private function getReadyQueueCacheKey() {
+               return "jobqueue:aggregator:ready-queues:v1"; // global
+       }
+}
diff --git a/includes/job/JobQueueAggregatorRedis.php 
b/includes/job/JobQueueAggregatorRedis.php
new file mode 100644
index 0000000..74e9171
--- /dev/null
+++ b/includes/job/JobQueueAggregatorRedis.php
@@ -0,0 +1,165 @@
+<?php
+/**
+ * Job queue aggregator code that uses PhpRedis.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues using PhpRedis
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueAggregatorRedis extends JobQueueAggregator {
+       /** @var RedisConnectionPool */
+       protected $redisPool;
+
+       /**
+        * @params include:
+        *   - redisConfig : An array of parameters to 
RedisConnectionPool::__construct().
+        *   - redisServer : A hostname/port combination or the absolute path 
of a UNIX socket.
+        *                   If a hostname is specified but no port, the 
standard port number
+        *                   6379 will be used. Required.
+        * @param array $params
+        */
+       protected function __construct( array $params ) {
+               parent::__construct( $params );
+               $this->server = $params['redisServer'];
+               $this->redisPool = RedisConnectionPool::singleton( 
$params['redisConfig'] );
+       }
+
+       /**
+        * @see JobQueueAggregator::doNotifyQueueEmpty()
+        */
+       protected function doNotifyQueueEmpty( $wiki, $type ) {
+               $conn = $this->getConnection();
+               if ( !$conn ) {
+                       return false;
+               }
+               try {
+                       $conn->hDel( $this->getReadyQueueKey(), 
$this->encQueueName( $type, $wiki ) );
+                       return true;
+               } catch ( RedisException $e ) {
+                       $this->handleException( $conn, $e );
+                       return false;
+               }
+       }
+
+       /**
+        * @see JobQueueAggregator::doNotifyQueueNonEmpty()
+        */
+       protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+               $conn = $this->getConnection();
+               if ( !$conn ) {
+                       return false;
+               }
+               try {
+                       $conn->hSet( $this->getReadyQueueKey(), 
$this->encQueueName( $type, $wiki ), time() );
+                       return true;
+               } catch ( RedisException $e ) {
+                       $this->handleException( $conn, $e );
+                       return false;
+               }
+       }
+
+       /**
+        * @see JobQueueAggregator::doAllGetReadyWikiQueues()
+        */
+       protected function doGetAllReadyWikiQueues() {
+               $conn = $this->getConnection();
+               if ( !$conn ) {
+                       return array();
+               }
+               try {
+                       $conn->multi( Redis::PIPELINE );
+                       $conn->exists( $this->getReadyQueueKey() );
+                       $conn->hGetAll( $this->getReadyQueueKey() );
+                       list( $exists, $map ) = $conn->exec();
+
+                       if ( $exists ) { // cache hit
+                               $pendingDBs = array(); // (type => list of 
wikis)
+                               foreach ( $map as $key => $time ) {
+                                       list( $type, $wiki ) = 
$this->dencQueueName( $key );
+                                       $pendingDBs[$type][] = $wiki;
+                               }
+                       } else { // cache miss
+                               $pendingDBs = $this->findPendingWikiQueues(); 
// (type => list of wikis)
+
+                               $now = time();
+                               $map = array();
+                               foreach ( $pendingDBs as $type => $wikis ) {
+                                       foreach ( $wikis as $wiki ) {
+                                               $map[$this->encQueueName( 
$type, $wiki )] = $now;
+                                       }
+                               }
+                               $conn->hMSet( $this->getReadyQueueKey(), $map );
+                       }
+
+                       return $pendingDBs;
+               } catch ( RedisException $e ) {
+                       $this->handleException( $conn, $e );
+                       return array();
+               }
+       }
+
+       /**
+        * Get a connection to the server that handles all sub-queues for this 
queue
+        *
+        * @return Array (server name, Redis instance)
+        * @throws MWException
+        */
+       protected function getConnection() {
+               return $this->redisPool->getConnection( $this->server );
+       }
+
+       /**
+        * @param RedisConnRef $conn
+        * @param RedisException $e
+        * @return void
+        */
+       protected function handleException( RedisConnRef $conn, $e ) {
+               $this->redisPool->handleException( $this->server, $conn, $e );
+       }
+
+       /**
+        * @return string
+        */
+       private function getReadyQueueKey() {
+               return "jobqueue:aggregator:h-ready-queues:v1"; // global
+       }
+
+       /**
+        * @param string $type
+        * @param string $wiki
+        * @return string
+        */
+       private function encQueueName( $type, $wiki ) {
+               return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
+       }
+
+       /**
+        * @param string $name
+        * @return string
+        */
+       private function dencQueueName( $name ) {
+               list( $type, $wiki ) = explode( '/', $name, 2 );
+               return array( rawurldecode( $type ), rawurldecode( $wiki ) );
+       }
+}
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
index 6e42305..f583c52 100644
--- a/includes/job/JobQueueDB.php
+++ b/includes/job/JobQueueDB.php
@@ -150,12 +150,11 @@
                                }
                        }
 
+                       $key = $this->getCacheKey( 'empty' );
                        $atomic = ( $flags & self::QoS_Atomic );
-                       $key    = $this->getCacheKey( 'empty' );
-                       $ttl    = self::CACHE_TTL_LONG;
 
                        $dbw->onTransactionIdle(
-                               function() use ( $dbw, $rowSet, $rowList, 
$atomic, $key, $ttl, $scope
+                               function() use ( $dbw, $rowSet, $rowList, 
$atomic, $key, $scope
                        ) {
                                global $wgMemc;
 
@@ -197,7 +196,7 @@
                                        $dbw->commit( __METHOD__ );
                                }
 
-                               $wgMemc->set( $key, 'false', $ttl ); // queue 
is not empty
+                               $wgMemc->set( $key, 'false', 
JobQueueDB::CACHE_TTL_LONG );
                        } );
                }
 
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
index 6d9d590..0118853 100644
--- a/includes/job/JobQueueGroup.php
+++ b/includes/job/JobQueueGroup.php
@@ -39,16 +39,18 @@
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY     = 2; // integer; any job
 
-       const USE_CACHE = 1; // integer; use process cache
+       const USE_CACHE = 1; // integer; use process or persistent cache
 
        const PROC_CACHE_TTL = 15; // integer; seconds
+
+       const CACHE_VERSION = 1; // integer; cache version
 
        /**
         * @param $wiki string Wiki ID
         */
        protected function __construct( $wiki ) {
                $this->wiki = $wiki;
-               $this->cache = new ProcessCacheLRU( 1 );
+               $this->cache = new ProcessCacheLRU( 10 );
        }
 
        /**
@@ -111,7 +113,9 @@
 
                $ok = true;
                foreach ( $jobsByType as $type => $jobs ) {
-                       if ( !$this->get( $type )->push( $jobs ) ) {
+                       if ( $this->get( $type )->push( $jobs ) ) {
+                               
JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+                       } else {
                                $ok = false;
                        }
                }
@@ -129,35 +133,44 @@
        /**
         * Pop a job off one of the job queues
         *
-        * @param $queueType integer JobQueueGroup::TYPE_* constant
+        * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type 
string
         * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
         * @return Job|bool Returns false on failure
         */
-       public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) {
-               if ( $flags & self::USE_CACHE ) {
-                       if ( !$this->cache->has( 'queues-ready', 'list', 
self::PROC_CACHE_TTL ) ) {
-                               $this->cache->set( 'queues-ready', 'list', 
$this->getQueuesWithJobs() );
+       public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
+               if ( is_string( $qtype ) ) { // specific job type
+                       $job = $this->get( $qtype )->pop();
+                       if ( !$job ) {
+                               
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
                        }
-                       $types = $this->cache->get( 'queues-ready', 'list' );
-               } else {
-                       $types = $this->getQueuesWithJobs();
-               }
-
-               if ( $queueType == self::TYPE_DEFAULT ) {
-                       $types = array_intersect( $types, 
$this->getDefaultQueueTypes() );
-               }
-               shuffle( $types ); // avoid starvation
-
-               foreach ( $types as $type ) { // for each queue...
-                       $job = $this->get( $type )->pop();
-                       if ( $job ) { // found
-                               return $job;
-                       } else { // not found
-                               $this->cache->clear( 'queues-ready' );
+                       return $job;
+               } else { // any job in the "default" jobs types
+                       if ( $flags & self::USE_CACHE ) {
+                               if ( !$this->cache->has( 'queues-ready', 
'list', self::PROC_CACHE_TTL ) ) {
+                                       $this->cache->set( 'queues-ready', 
'list', $this->getQueuesWithJobs() );
+                               }
+                               $types = $this->cache->get( 'queues-ready', 
'list' );
+                       } else {
+                               $types = $this->getQueuesWithJobs();
                        }
-               }
 
-               return false; // no jobs found
+                       if ( $qtype == self::TYPE_DEFAULT ) {
+                               $types = array_intersect( $types, 
$this->getDefaultQueueTypes() );
+                       }
+                       shuffle( $types ); // avoid starvation
+
+                       foreach ( $types as $type ) { // for each queue...
+                               $job = $this->get( $type )->pop();
+                               if ( $job ) { // found
+                                       return $job;
+                               } else { // not found
+                                       
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
+                                       $this->cache->clear( 'queues-ready' );
+                               }
+                       }
+
+                       return false; // no jobs found
+               }
        }
 
        /**
@@ -204,24 +217,13 @@
        }
 
        /**
+        * Get the list of job types that have non-empty queues
+        *
         * @return Array List of job types that have non-empty queues
         */
        public function getQueuesWithJobs() {
                $types = array();
                foreach ( $this->getQueueTypes() as $type ) {
-                       if ( !$this->get( $type )->isEmpty() ) {
-                               $types[] = $type;
-                       }
-               }
-               return $types;
-       }
-
-       /**
-        * @return Array List of default job types that have non-empty queues
-        */
-       public function getDefaultQueuesWithJobs() {
-               $types = array();
-               foreach ( $this->getDefaultQueueTypes() as $type ) {
                        if ( !$this->get( $type )->isEmpty() ) {
                                $types[] = $type;
                        }
diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php
index fc38938..cf90cf6 100644
--- a/maintenance/nextJobDB.php
+++ b/maintenance/nextJobDB.php
@@ -51,37 +51,12 @@
                // Handle any required periodic queue maintenance
                $this->executeReadyPeriodicTasks();
 
-               $memcKey = 'jobqueue:dbs:v3';
-               $pendingDbInfo = $wgMemc->get( $memcKey );
-
-               // If the cache entry wasn't present, is stale, or in .1% of 
cases otherwise,
-               // regenerate the cache. Use any available stale cache if 
another process is
-               // currently regenerating the pending DB information.
-               if ( !is_array( $pendingDbInfo )
-                       || ( time() - $pendingDbInfo['timestamp'] ) > 300 // 5 
minutes
-                       || mt_rand( 0, 999 ) == 0
-               ) {
-                       if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // 
lock
-                               $pendingDbInfo = array(
-                                       'pendingDBs' => $this->getPendingDbs(),
-                                       'timestamp'  => time()
-                               );
-                               for ( $attempts=1; $attempts <= 25; ++$attempts 
) {
-                                       if ( $wgMemc->add( "$memcKey:lock", 1, 
60 ) ) { // lock
-                                               $wgMemc->set( $memcKey, 
$pendingDbInfo );
-                                               $wgMemc->delete( 
"$memcKey:lock" ); // unlock
-                                               break;
-                                       }
-                               }
-                               $wgMemc->delete( "$memcKey:rebuild" ); // unlock
-                       }
-               }
-
-               if ( !is_array( $pendingDbInfo ) || 
!$pendingDbInfo['pendingDBs'] ) {
+               // Get all the queues with jobs in them
+               $pendingDBs = 
JobQueueAggregator::singleton()->getAllReadyWikiQueues();
+               if ( !count( $pendingDBs ) ) {
                        return; // no DBs with jobs or cache is both empty and 
locked
                }
 
-               $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
                do {
                        $again = false;
 
@@ -101,20 +76,8 @@
 
                        list( $type, $db ) = $candidates[ mt_rand( 0, count( 
$candidates ) - 1 ) ];
                        if ( !$this->checkJob( $type, $db ) ) { // queue is 
actually empty?
-                               $pendingDBs = $this->delistDB( $pendingDBs, 
$db, $type );
-                               // Update the cache to remove the outdated 
information.
-                               // Make sure that this does not race 
(especially with full rebuilds).
-                               if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { 
// lock
-                                       $curInfo = $wgMemc->get( $memcKey );
-                                       if ( is_array( $curInfo ) ) {
-                                               $curInfo['pendingDBs'] =
-                                                       $this->delistDB( 
$curInfo['pendingDBs'], $db, $type );
-                                               $wgMemc->set( $memcKey, 
$curInfo );
-                                               // May as well make use of this 
newer information
-                                               $pendingDBs = 
$curInfo['pendingDBs'];
-                                       }
-                                       $wgMemc->delete( "$memcKey:lock" ); // 
unlock
-                               }
+                               $pendingDBs[$type] = array_diff( 
$pendingDBs[$type], $db );
+                               
JobQueueAggregator::singleton()->notifyQueueEmpty( $db, $type );
                                $again = true;
                        }
                } while ( $again );
@@ -127,19 +90,6 @@
        }
 
        /**
-        * Remove a type/DB entry from the list of queues with jobs
-        *
-        * @param $pendingDBs array
-        * @param $db string
-        * @param $type string
-        * @return Array
-        */
-       private function delistDB( array $pendingDBs, $db, $type ) {
-               $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db 
) );
-               return $pendingDBs;
-       }
-
-       /**
         * Check if the specified database has a job of the specified type in 
it.
         * The type may be false to indicate "all".
         * @param $type string
@@ -148,23 +98,6 @@
         */
        private function checkJob( $type, $dbName ) {
                return !JobQueueGroup::singleton( $dbName )->get( $type 
)->isEmpty();
-       }
-
-       /**
-        * Get all databases that have a pending job
-        * @return array
-        */
-       private function getPendingDbs() {
-               global $wgLocalDatabases;
-
-               $pendingDBs = array(); // (job type => (db list))
-               foreach ( $wgLocalDatabases as $db ) {
-                       foreach ( JobQueueGroup::singleton( $db 
)->getQueuesWithJobs() as $type ) {
-                               $pendingDBs[$type][] = $db;
-                       }
-               }
-
-               return $pendingDBs;
        }
 
        /**
diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php
index ff09683..d582f51 100644
--- a/maintenance/runJobs.php
+++ b/maintenance/runJobs.php
@@ -85,7 +85,7 @@
                do {
                        $job = ( $type === false )
                                ? $group->pop( JobQueueGroup::TYPE_DEFAULT, 
JobQueueGroup::USE_CACHE )
-                               : $group->get( $type )->pop(); // job from a 
single queue
+                               : $group->pop( $type ); // job from a single 
queue
                        if ( $job ) { // found a job
                                // Perform the job (logging success/failure and 
runtime)...
                                $t = microtime( true );

-- 
To view, visit https://gerrit.wikimedia.org/r/48223
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ifb3c6c881decd643da1b662956ded69db4b39431
Gerrit-PatchSet: 5
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org>
Gerrit-Reviewer: Aaron Schulz <asch...@wikimedia.org>
Gerrit-Reviewer: Tim Starling <tstarl...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to