Aaron Schulz has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/69451


Change subject: jobqueue: cleaned up JobQueue exception handling.
......................................................................

jobqueue: cleaned up JobQueue exception handling.

* Added JobQueueError exceptions.
* Periodic tasks that fail are logged and skipped.
* JobQueueFederated properly fails over now.

Change-Id: I9d9f0dae548a9dde693a7cd25c255a8bfbf37899
---
M includes/AutoLoader.php
M includes/job/JobQueue.php
M includes/job/JobQueueDB.php
M includes/job/JobQueueFederated.php
M includes/job/JobQueueGroup.php
M includes/job/JobQueueRedis.php
6 files changed, 264 insertions(+), 166 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core 
refs/changes/51/69451/1

diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php
index 02c92df..705c51e 100644
--- a/includes/AutoLoader.php
+++ b/includes/AutoLoader.php
@@ -662,6 +662,7 @@
        'JobQueueAggregatorMemc' => 
'includes/job/aggregator/JobQueueAggregatorMemc.php',
        'JobQueueAggregatorRedis' => 
'includes/job/aggregator/JobQueueAggregatorRedis.php',
        'JobQueueDB' => 'includes/job/JobQueueDB.php',
+       'JobQueueError' => 'includes/job/JobQueue.php',
        'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
        'JobQueueFederated' => 'includes/job/JobQueueFederated.php',
        'JobQueueRedis' => 'includes/job/JobQueueRedis.php',
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index 3295c24..9eb3ac7 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -168,7 +168,7 @@
         * not distinguishable from the race condition between isEmpty() and 
pop().
         *
         * @return bool
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function isEmpty() {
                wfProfileIn( __METHOD__ );
@@ -190,7 +190,7 @@
         * If caching is used, this number might be out of date for a minute.
         *
         * @return integer
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function getSize() {
                wfProfileIn( __METHOD__ );
@@ -212,7 +212,7 @@
         * If caching is used, this number might be out of date for a minute.
         *
         * @return integer
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function getAcquiredCount() {
                wfProfileIn( __METHOD__ );
@@ -234,7 +234,7 @@
         * If caching is used, this number might be out of date for a minute.
         *
         * @return integer
-        * @throws MWException
+        * @throws JobQueueError
         * @since 1.22
         */
        final public function getDelayedCount() {
@@ -259,7 +259,7 @@
         * If caching is used, this number might be out of date for a minute.
         *
         * @return integer
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function getAbandonedCount() {
                wfProfileIn( __METHOD__ );
@@ -284,7 +284,7 @@
         * @param $jobs Job|Array
         * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
         * @return bool Returns false on failure
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function push( $jobs, $flags = 0 ) {
                return $this->batchPush( is_array( $jobs ) ? $jobs : array( 
$jobs ), $flags );
@@ -298,7 +298,7 @@
         * @param array $jobs List of Jobs
         * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
         * @return bool Returns false on failure
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function batchPush( array $jobs, $flags = 0 ) {
                if ( !count( $jobs ) ) {
@@ -333,7 +333,7 @@
         * Outside callers should use JobQueueGroup::pop() instead of this 
function.
         *
         * @return Job|bool Returns false if there are no jobs
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function pop() {
                global $wgJobClasses;
@@ -374,7 +374,7 @@
         *
         * @param $job Job
         * @return bool
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function ack( Job $job ) {
                if ( $job->getType() !== $this->type ) {
@@ -421,7 +421,7 @@
         *
         * @param $job Job
         * @return bool
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function deduplicateRootJob( Job $job ) {
                if ( $job->getType() !== $this->type ) {
@@ -466,7 +466,7 @@
         *
         * @param $job Job
         * @return bool
-        * @throws MWException
+        * @throws JobQueueError
         */
        final protected function isRootJobOldDuplicate( Job $job ) {
                if ( $job->getType() !== $this->type ) {
@@ -511,7 +511,7 @@
         * Deleted all unclaimed and delayed jobs from the queue
         *
         * @return bool Success
-        * @throws MWException
+        * @throws JobQueueError
         * @since 1.22
         */
        final public function delete() {
@@ -535,7 +535,7 @@
         * This does nothing for certain queue classes.
         *
         * @return void
-        * @throws MWException
+        * @throws JobQueueError
         */
        final public function waitForBackups() {
                wfProfileIn( __METHOD__ );
@@ -600,7 +600,7 @@
         * This should only be called on a queue that is no longer being popped.
         *
         * @return Iterator
-        * @throws MWException
+        * @throws JobQueueError
         */
        abstract public function getAllQueuedJobs();
 
@@ -609,7 +609,7 @@
         * This should only be called on a queue that is no longer being popped.
         *
         * @return Iterator
-        * @throws MWException
+        * @throws JobQueueError
         * @since 1.22
         */
        public function getAllDelayedJobs() {
@@ -640,3 +640,10 @@
                throw new MWException( "Queue namespacing not supported for 
this queue type." );
        }
 }
+
+/**
+ * @ingroup JobQueue
+ * @since 1.22
+ */
+class JobQueueError extends MWException {}
+class JobQueueConnectionError extends JobQueueError {}
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
index 56da4f3..3fa0655 100644
--- a/includes/job/JobQueueDB.php
+++ b/includes/job/JobQueueDB.php
@@ -80,9 +80,13 @@
                }
 
                list( $dbr, $scope ) = $this->getSlaveDB();
-               $found = $dbr->selectField( // unclaimed job
-                       'job', '1', array( 'job_cmd' => $this->type, 
'job_token' => '' ), __METHOD__
-               );
+               try {
+                       $found = $dbr->selectField( // unclaimed job
+                               'job', '1', array( 'job_cmd' => $this->type, 
'job_token' => '' ), __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $this->cache->add( $key, $found ? 'false' : 'true', 
self::CACHE_TTL_LONG );
 
                return !$found;
@@ -100,11 +104,15 @@
                        return $size;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
-               $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array( 'job_cmd' => $this->type, 'job_token' => '' ),
-                       __METHOD__
-               );
+               try {
+                       list( $dbr, $scope ) = $this->getSlaveDB();
+                       $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                               array( 'job_cmd' => $this->type, 'job_token' => 
'' ),
+                               __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
@@ -127,10 +135,14 @@
                }
 
                list( $dbr, $scope ) = $this->getSlaveDB();
-               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array( 'job_cmd' => $this->type, "job_token != 
{$dbr->addQuotes( '' )}" ),
-                       __METHOD__
-               );
+               try {
+                       $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                               array( 'job_cmd' => $this->type, "job_token != 
{$dbr->addQuotes( '' )}" ),
+                               __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
@@ -156,14 +168,18 @@
                }
 
                list( $dbr, $scope ) = $this->getSlaveDB();
-               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array(
-                               'job_cmd' => $this->type,
-                               "job_token != {$dbr->addQuotes( '' )}",
-                               "job_attempts >= " . $dbr->addQuotes( 
$this->maxTries )
-                       ),
-                       __METHOD__
-               );
+               try {
+                       $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                               array(
+                                       'job_cmd' => $this->type,
+                                       "job_token != {$dbr->addQuotes( '' )}",
+                                       "job_attempts >= " . $dbr->addQuotes( 
$this->maxTries )
+                               ),
+                               __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
@@ -269,43 +285,47 @@
                }
 
                list( $dbw, $scope ) = $this->getMasterDB();
-               $dbw->commit( __METHOD__, 'flush' ); // flush existing 
transaction
-               $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-               $dbw->clearFlag( DBO_TRX ); // make each query its own 
transaction
-               $scopedReset = new ScopedCallback( function() use ( $dbw, 
$autoTrx ) {
-                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old 
setting
-               } );
+               try {
+                       $dbw->commit( __METHOD__, 'flush' ); // flush existing 
transaction
+                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current 
setting
+                       $dbw->clearFlag( DBO_TRX ); // make each query its own 
transaction
+                       $scopedReset = new ScopedCallback( function() use ( 
$dbw, $autoTrx ) {
+                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // 
restore old setting
+                       } );
 
-               $uuid = wfRandomString( 32 ); // pop attempt
-               $job = false; // job popped off
-               do { // retry when our row is invalid or deleted as a duplicate
-                       // Try to reserve a row in the DB...
-                       if ( in_array( $this->order, array( 'fifo', 'timestamp' 
) ) ) {
-                               $row = $this->claimOldest( $uuid );
-                       } else { // random first
-                               $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // 
encourage concurrent UPDATEs
-                               $gte = (bool)mt_rand( 0, 1 ); // find rows with 
rand before/after $rand
-                               $row = $this->claimRandom( $uuid, $rand, $gte );
-                       }
-                       // Check if we found a row to reserve...
-                       if ( !$row ) {
-                               $this->cache->set( $this->getCacheKey( 'empty' 
), 'true', self::CACHE_TTL_LONG );
-                               break; // nothing to do
-                       }
-                       JobQueue::incrStats( 'job-pop', $this->type );
-                       // Get the job object from the row...
-                       $title = Title::makeTitleSafe( $row->job_namespace, 
$row->job_title );
-                       if ( !$title ) {
-                               $dbw->delete( 'job', array( 'job_id' => 
$row->job_id ), __METHOD__ );
-                               wfDebug( "Row has invalid title 
'{$row->job_title}'." );
-                               continue; // try again
-                       }
-                       $job = Job::factory( $row->job_cmd, $title,
-                               self::extractBlob( $row->job_params ), 
$row->job_id );
-                       $job->metadata['id'] = $row->job_id;
-                       $job->id = $row->job_id; // XXX: work around broken 
subclasses
-                       break; // done
-               } while ( true );
+                       $uuid = wfRandomString( 32 ); // pop attempt
+                       $job = false; // job popped off
+                       do { // retry when our row is invalid or deleted as a 
duplicate
+                               // Try to reserve a row in the DB...
+                               if ( in_array( $this->order, array( 'fifo', 
'timestamp' ) ) ) {
+                                       $row = $this->claimOldest( $uuid );
+                               } else { // random first
+                                       $rand = mt_rand( 0, 
self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+                                       $gte = (bool)mt_rand( 0, 1 ); // find 
rows with rand before/after $rand
+                                       $row = $this->claimRandom( $uuid, 
$rand, $gte );
+                               }
+                               // Check if we found a row to reserve...
+                               if ( !$row ) {
+                                       $this->cache->set( $this->getCacheKey( 
'empty' ), 'true', self::CACHE_TTL_LONG );
+                                       break; // nothing to do
+                               }
+                               JobQueue::incrStats( 'job-pop', $this->type );
+                               // Get the job object from the row...
+                               $title = Title::makeTitleSafe( 
$row->job_namespace, $row->job_title );
+                               if ( !$title ) {
+                                       $dbw->delete( 'job', array( 'job_id' => 
$row->job_id ), __METHOD__ );
+                                       wfDebug( "Row has invalid title 
'{$row->job_title}'." );
+                                       continue; // try again
+                               }
+                               $job = Job::factory( $row->job_cmd, $title,
+                                       self::extractBlob( $row->job_params ), 
$row->job_id );
+                               $job->metadata['id'] = $row->job_id;
+                               $job->id = $row->job_id; // XXX: work around 
broken subclasses
+                               break; // done
+                       } while ( true );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
 
                return $job;
        }
@@ -461,16 +481,20 @@
                }
 
                list( $dbw, $scope ) = $this->getMasterDB();
-               $dbw->commit( __METHOD__, 'flush' ); // flush existing 
transaction
-               $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-               $dbw->clearFlag( DBO_TRX ); // make each query its own 
transaction
-               $scopedReset = new ScopedCallback( function() use ( $dbw, 
$autoTrx ) {
-                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old 
setting
-               } );
+               try {
+                       $dbw->commit( __METHOD__, 'flush' ); // flush existing 
transaction
+                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current 
setting
+                       $dbw->clearFlag( DBO_TRX ); // make each query its own 
transaction
+                       $scopedReset = new ScopedCallback( function() use ( 
$dbw, $autoTrx ) {
+                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // 
restore old setting
+                       } );
 
-               // Delete a row with a single DELETE without holding row locks 
over RTTs...
-               $dbw->delete( 'job',
-                       array( 'job_cmd' => $this->type, 'job_id' => 
$job->metadata['id'] ), __METHOD__ );
+                       // Delete a row with a single DELETE without holding 
row locks over RTTs...
+                       $dbw->delete( 'job',
+                               array( 'job_cmd' => $this->type, 'job_id' => 
$job->metadata['id'] ), __METHOD__ );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
 
                return true;
        }
@@ -516,7 +540,11 @@
        protected function doDelete() {
                list( $dbw, $scope ) = $this->getMasterDB();
 
-               $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+               try {
+                       $dbw->delete( 'job', array( 'job_cmd' => $this->type ) 
);
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                return true;
        }
 
@@ -555,20 +583,25 @@
         */
        public function getAllQueuedJobs() {
                list( $dbr, $scope ) = $this->getSlaveDB();
-               return new MappedIterator(
-                       $dbr->select( 'job', '*', array( 'job_cmd' => 
$this->getType(), 'job_token' => '' ) ),
-                       function( $row ) use ( $scope ) {
-                               $job = Job::factory(
-                                       $row->job_cmd,
-                                       Title::makeTitle( $row->job_namespace, 
$row->job_title ),
-                                       strlen( $row->job_params ) ? 
unserialize( $row->job_params ) : false,
-                                       $row->job_id
-                               );
-                               $job->metadata['id'] = $row->job_id;
-                               $job->id = $row->job_id; // XXX: work around 
broken subclasses
-                               return $job;
-                       }
-               );
+               try {
+                       return new MappedIterator(
+                               $dbr->select( 'job', '*',
+                                       array( 'job_cmd' => $this->getType(), 
'job_token' => '' ) ),
+                               function( $row ) use ( $scope ) {
+                                       $job = Job::factory(
+                                               $row->job_cmd,
+                                               Title::makeTitle( 
$row->job_namespace, $row->job_title ),
+                                               strlen( $row->job_params ) ? 
unserialize( $row->job_params ) : false,
+                                               $row->job_id
+                                       );
+                                       $job->metadata['id'] = $row->job_id;
+                                       $job->id = $row->job_id; // XXX: work 
around broken subclasses
+                                       return $job;
+                               }
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
        }
 
        /**
@@ -578,75 +611,79 @@
         */
        public function recycleAndDeleteStaleJobs() {
                $now = time();
-               list( $dbw, $scope ) = $this->getMasterDB();
                $count = 0; // affected rows
+               list( $dbw, $scope ) = $this->getMasterDB();
 
-               if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 
1 ) ) {
-                       return $count; // already in progress
-               }
+               try {
+                       if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", 
__METHOD__, 1 ) ) {
+                               return $count; // already in progress
+                       }
 
-               // Remove claims on jobs acquired for too long if enabled...
-               if ( $this->claimTTL > 0 ) {
-                       $claimCutoff = $dbw->timestamp( $now - $this->claimTTL 
);
-                       // Get the IDs of jobs that have be claimed but not 
finished after too long.
-                       // These jobs can be recycled into the queue by 
expiring the claim. Selecting
-                       // the IDs first means that the UPDATE can be done by 
primary key (less deadlocks).
-                       $res = $dbw->select( 'job', 'job_id',
-                               array(
-                                       'job_cmd' => $this->type,
-                                       "job_token != {$dbw->addQuotes( '' )}", 
// was acquired
-                                       "job_token_timestamp < 
{$dbw->addQuotes( $claimCutoff )}", // stale
-                                       "job_attempts < {$dbw->addQuotes( 
$this->maxTries )}" ), // retries left
-                               __METHOD__
+                       // Remove claims on jobs acquired for too long if 
enabled...
+                       if ( $this->claimTTL > 0 ) {
+                               $claimCutoff = $dbw->timestamp( $now - 
$this->claimTTL );
+                               // Get the IDs of jobs that have be claimed but 
not finished after too long.
+                               // These jobs can be recycled into the queue by 
expiring the claim. Selecting
+                               // the IDs first means that the UPDATE can be 
done by primary key (less deadlocks).
+                               $res = $dbw->select( 'job', 'job_id',
+                                       array(
+                                               'job_cmd' => $this->type,
+                                               "job_token != {$dbw->addQuotes( 
'' )}", // was acquired
+                                               "job_token_timestamp < 
{$dbw->addQuotes( $claimCutoff )}", // stale
+                                               "job_attempts < 
{$dbw->addQuotes( $this->maxTries )}" ), // retries left
+                                       __METHOD__
+                               );
+                               $ids = array_map(
+                                       function( $o ) {
+                                               return $o->job_id;
+                                       }, iterator_to_array( $res )
+                               );
+                               if ( count( $ids ) ) {
+                                       // Reset job_token for these jobs so 
that other runners will pick them up.
+                                       // Set the timestamp to the current 
time, as it is useful to now that the job
+                                       // was already tried before (the 
timestamp becomes the "released" time).
+                                       $dbw->update( 'job',
+                                               array(
+                                                       'job_token' => '',
+                                                       'job_token_timestamp' 
=> $dbw->timestamp( $now ) ), // time of release
+                                               array(
+                                                       'job_id' => $ids ),
+                                               __METHOD__
+                                       );
+                                       $count += $dbw->affectedRows();
+                                       JobQueue::incrStats( 'job-recycle', 
$this->type, $dbw->affectedRows() );
+                                       $this->cache->set( $this->getCacheKey( 
'empty' ), 'false', self::CACHE_TTL_LONG );
+                               }
+                       }
+
+                       // Just destroy any stale jobs...
+                       $pruneCutoff = $dbw->timestamp( $now - 
self::MAX_AGE_PRUNE );
+                       $conds = array(
+                               'job_cmd' => $this->type,
+                               "job_token != {$dbw->addQuotes( '' )}", // was 
acquired
+                               "job_token_timestamp < {$dbw->addQuotes( 
$pruneCutoff )}" // stale
                        );
+                       if ( $this->claimTTL > 0 ) { // only prune jobs 
attempted too many times...
+                               $conds[] = "job_attempts >= {$dbw->addQuotes( 
$this->maxTries )}";
+                       }
+                       // Get the IDs of jobs that are considered stale and 
should be removed. Selecting
+                       // the IDs first means that the UPDATE can be done by 
primary key (less deadlocks).
+                       $res = $dbw->select( 'job', 'job_id', $conds, 
__METHOD__ );
                        $ids = array_map(
                                function( $o ) {
                                        return $o->job_id;
                                }, iterator_to_array( $res )
                        );
                        if ( count( $ids ) ) {
-                               // Reset job_token for these jobs so that other 
runners will pick them up.
-                               // Set the timestamp to the current time, as it 
is useful to now that the job
-                               // was already tried before (the timestamp 
becomes the "released" time).
-                               $dbw->update( 'job',
-                                       array(
-                                               'job_token' => '',
-                                               'job_token_timestamp' => 
$dbw->timestamp( $now ) ), // time of release
-                                       array(
-                                               'job_id' => $ids ),
-                                       __METHOD__
-                               );
+                               $dbw->delete( 'job', array( 'job_id' => $ids ), 
__METHOD__ );
                                $count += $dbw->affectedRows();
-                               JobQueue::incrStats( 'job-recycle', 
$this->type, $dbw->affectedRows() );
-                               $this->cache->set( $this->getCacheKey( 'empty' 
), 'false', self::CACHE_TTL_LONG );
+                               JobQueue::incrStats( 'job-abandon', 
$this->type, $dbw->affectedRows() );
                        }
-               }
 
-               // Just destroy any stale jobs...
-               $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
-               $conds = array(
-                       'job_cmd' => $this->type,
-                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
-                       "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff 
)}" // stale
-               );
-               if ( $this->claimTTL > 0 ) { // only prune jobs attempted too 
many times...
-                       $conds[] = "job_attempts >= {$dbw->addQuotes( 
$this->maxTries )}";
+                       $dbw->unlock( "jobqueue-recycle-{$this->type}", 
__METHOD__ );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
                }
-               // Get the IDs of jobs that are considered stale and should be 
removed. Selecting
-               // the IDs first means that the UPDATE can be done by primary 
key (less deadlocks).
-               $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
-               $ids = array_map(
-                       function( $o ) {
-                               return $o->job_id;
-                       }, iterator_to_array( $res )
-               );
-               if ( count( $ids ) ) {
-                       $dbw->delete( 'job', array( 'job_id' => $ids ), 
__METHOD__ );
-                       $count += $dbw->affectedRows();
-                       JobQueue::incrStats( 'job-abandon', $this->type, 
$dbw->affectedRows() );
-               }
-
-               $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
 
                return $count;
        }
@@ -655,14 +692,22 @@
         * @return Array (DatabaseBase, ScopedCallback)
         */
        protected function getSlaveDB() {
-               return $this->getDB( DB_SLAVE );
+               try {
+                       return $this->getDB( DB_SLAVE );
+               } catch ( DBConnectionError $e ) {
+                       throw new JobQueueConnectionError( "DBConnectionError:" 
. $e->getMessage() );
+               }
        }
 
        /**
         * @return Array (DatabaseBase, ScopedCallback)
         */
        protected function getMasterDB() {
-               return $this->getDB( DB_MASTER );
+               try {
+                       return $this->getDB( DB_MASTER );
+               } catch ( DBConnectionError $e ) {
+                       throw new JobQueueConnectionError( "DBConnectionError:" 
. $e->getMessage() );
+               }
        }
 
        /**
@@ -737,4 +782,12 @@
                        return false;
                }
        }
+
+       /**
+        * @param DBError $e
+        * @throws JobQueueError
+        */
+       protected function throwDBException( DBError $e ) {
+               throw new JobQueueError( get_class( $e ) . ": " . 
$e->getMessage() );
+       }
 }
diff --git a/includes/job/JobQueueFederated.php 
b/includes/job/JobQueueFederated.php
index db5b686..8ac4b6e 100644
--- a/includes/job/JobQueueFederated.php
+++ b/includes/job/JobQueueFederated.php
@@ -128,9 +128,13 @@
                }
 
                foreach ( $this->partitionQueues as $queue ) {
-                       if ( !$queue->doIsEmpty() ) {
-                               $this->cache->add( $key, 'false', 
self::CACHE_TTL_LONG );
-                               return false;
+                       try {
+                               if ( !$queue->doIsEmpty() ) {
+                                       $this->cache->add( $key, 'false', 
self::CACHE_TTL_LONG );
+                                       return false;
+                               }
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
                        }
                }
 
@@ -169,7 +173,11 @@
 
                $count = 0;
                foreach ( $this->partitionQueues as $queue ) {
-                       $count += $queue->$method();
+                       try {
+                               $count += $queue->$method();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -234,7 +242,13 @@
                // Insert the de-duplicated jobs into the queues...
                foreach ( $uJobsByPartition as $partition => $jobBatch ) {
                        $queue = $this->partitionQueues[$partition];
-                       if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+                       try {
+                               $ok = $queue->doBatchPush( $jobBatch, $flags );
+                       } catch ( JobQueueError $e ) {
+                               $ok = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
+                       if ( $ok ) {
                                $key = $this->getCacheKey( 'empty' );
                                $this->cache->set( $key, 'false', 
JobQueueDB::CACHE_TTL_LONG );
                        } else {
@@ -249,7 +263,13 @@
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch 
); // not inserted
                        } else {
                                $queue = $this->partitionQueues[$partition];
-                               if ( $queue->doBatchPush( $jobBatch, $flags ) ) 
{
+                               try {
+                                       $ok = $queue->doBatchPush( $jobBatch, 
$flags );
+                               } catch ( JobQueueError $e ) {
+                                       $ok = false;
+                                       wfDebugLog( 'exception', 
$e->getLogMessage() );
+                               }
+                               if ( $ok ) {
                                        $key = $this->getCacheKey( 'empty' );
                                        $this->cache->set( $key, 'false', 
JobQueueDB::CACHE_TTL_LONG );
                                } else {
@@ -278,7 +298,12 @@
                                break; // all partitions at 0 weight
                        }
                        $queue = $this->partitionQueues[$partition];
-                       $job = $queue->pop();
+                       try {
+                               $job = $queue->pop();
+                       } catch ( JobQueueError $e ) {
+                               $job = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                        if ( $job ) {
                                $job->metadata['QueuePartition'] = $partition;
                                return $job;
@@ -300,13 +325,21 @@
 
        protected function doDelete() {
                foreach ( $this->partitionQueues as $queue ) {
-                       $queue->doDelete();
+                       try {
+                               $queue->doDelete();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
        }
 
        protected function doWaitForBackups() {
                foreach ( $this->partitionQueues as $queue ) {
-                       $queue->waitForBackups();
+                       try {
+                               $queue->waitForBackups();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
        }
 
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
index 85f99b7..e483e05 100644
--- a/includes/job/JobQueueGroup.php
+++ b/includes/job/JobQueueGroup.php
@@ -310,9 +310,13 @@
                                } elseif ( !isset( $lastRuns[$type][$task] )
                                        || $lastRuns[$type][$task] < ( time() - 
$definition['period'] ) )
                                {
-                                       if ( call_user_func( 
$definition['callback'] ) !== null ) {
-                                               $tasksRun[$type][$task] = 
time();
-                                               ++$count;
+                                       try {
+                                               if ( call_user_func( 
$definition['callback'] ) !== null ) {
+                                                       $tasksRun[$type][$task] 
= time();
+                                                       ++$count;
+                                               }
+                                       } catch ( JobQueueError $e ) {
+                                               wfDebugLog( 'exception', 
$e->getLogMessage() );
                                        }
                                }
                        }
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 1f5b761..6a66e43 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -780,7 +780,7 @@
        protected function getConnection() {
                $conn = $this->redisPool->getConnection( $this->server );
                if ( !$conn ) {
-                       throw new MWException( "Unable to connect to redis 
server." );
+                       throw new JobQueueConnectionError( "Unable to connect 
to redis server." );
                }
                return $conn;
        }
@@ -793,7 +793,7 @@
         */
        protected function throwRedisException( $server, RedisConnRef $conn, $e 
) {
                $this->redisPool->handleException( $server, $conn, $e );
-               throw new MWException( "Redis server error: 
{$e->getMessage()}\n" );
+               throw new JobQueueError( "Redis server error: 
{$e->getMessage()}\n" );
        }
 
        /**

-- 
To view, visit https://gerrit.wikimedia.org/r/69451
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9d9f0dae548a9dde693a7cd25c255a8bfbf37899
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to