jenkins-bot has submitted this change and it was merged.
Change subject: jobqueue: cleaned up JobQueue exception handling
......................................................................
jobqueue: cleaned up JobQueue exception handling
* Added JobQueueError exceptions.
* Periodic tasks that fail are logged and skipped.
* JobQueueFederated properly fails over now.
Change-Id: I9d9f0dae548a9dde693a7cd25c255a8bfbf37899
---
M includes/AutoLoader.php
M includes/job/JobQueue.php
M includes/job/JobQueueDB.php
M includes/job/JobQueueFederated.php
M includes/job/JobQueueGroup.php
M includes/job/JobQueueRedis.php
6 files changed, 265 insertions(+), 166 deletions(-)
Approvals:
Tim Starling: Looks good to me, approved
jenkins-bot: Verified
diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php
index dc9acf3..42d7d88 100644
--- a/includes/AutoLoader.php
+++ b/includes/AutoLoader.php
@@ -665,6 +665,8 @@
'JobQueueAggregatorMemc' =>
'includes/job/aggregator/JobQueueAggregatorMemc.php',
'JobQueueAggregatorRedis' =>
'includes/job/aggregator/JobQueueAggregatorRedis.php',
'JobQueueDB' => 'includes/job/JobQueueDB.php',
+ 'JobQueueConnectionError' => 'includes/job/JobQueue.php',
+ 'JobQueueError' => 'includes/job/JobQueue.php',
'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
'JobQueueFederated' => 'includes/job/JobQueueFederated.php',
'JobQueueRedis' => 'includes/job/JobQueueRedis.php',
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index aa47432..3e94b13 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -168,7 +168,7 @@
* not distinguishable from the race condition between isEmpty() and
pop().
*
* @return bool
- * @throws MWException
+ * @throws JobQueueError
*/
final public function isEmpty() {
wfProfileIn( __METHOD__ );
@@ -190,7 +190,7 @@
* If caching is used, this number might be out of date for a minute.
*
* @return integer
- * @throws MWException
+ * @throws JobQueueError
*/
final public function getSize() {
wfProfileIn( __METHOD__ );
@@ -212,7 +212,7 @@
* If caching is used, this number might be out of date for a minute.
*
* @return integer
- * @throws MWException
+ * @throws JobQueueError
*/
final public function getAcquiredCount() {
wfProfileIn( __METHOD__ );
@@ -234,7 +234,7 @@
* If caching is used, this number might be out of date for a minute.
*
* @return integer
- * @throws MWException
+ * @throws JobQueueError
* @since 1.22
*/
final public function getDelayedCount() {
@@ -259,7 +259,7 @@
* If caching is used, this number might be out of date for a minute.
*
* @return integer
- * @throws MWException
+ * @throws JobQueueError
*/
final public function getAbandonedCount() {
wfProfileIn( __METHOD__ );
@@ -284,7 +284,7 @@
* @param $jobs Job|Array
* @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
* @return bool Returns false on failure
- * @throws MWException
+ * @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
return $this->batchPush( is_array( $jobs ) ? $jobs : array(
$jobs ), $flags );
@@ -298,7 +298,7 @@
* @param array $jobs List of Jobs
* @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
* @return bool Returns false on failure
- * @throws MWException
+ * @throws JobQueueError
*/
final public function batchPush( array $jobs, $flags = 0 ) {
if ( !count( $jobs ) ) {
@@ -333,7 +333,7 @@
* Outside callers should use JobQueueGroup::pop() instead of this
function.
*
* @return Job|bool Returns false if there are no jobs
- * @throws MWException
+ * @throws JobQueueError
*/
final public function pop() {
global $wgJobClasses;
@@ -374,7 +374,7 @@
*
* @param $job Job
* @return bool
- * @throws MWException
+ * @throws JobQueueError
*/
final public function ack( Job $job ) {
if ( $job->getType() !== $this->type ) {
@@ -421,7 +421,7 @@
*
* @param $job Job
* @return bool
- * @throws MWException
+ * @throws JobQueueError
*/
final public function deduplicateRootJob( Job $job ) {
if ( $job->getType() !== $this->type ) {
@@ -466,7 +466,7 @@
*
* @param $job Job
* @return bool
- * @throws MWException
+ * @throws JobQueueError
*/
final protected function isRootJobOldDuplicate( Job $job ) {
if ( $job->getType() !== $this->type ) {
@@ -511,7 +511,7 @@
* Deleted all unclaimed and delayed jobs from the queue
*
* @return bool Success
- * @throws MWException
+ * @throws JobQueueError
* @since 1.22
*/
final public function delete() {
@@ -535,7 +535,7 @@
* This does nothing for certain queue classes.
*
* @return void
- * @throws MWException
+ * @throws JobQueueError
*/
final public function waitForBackups() {
wfProfileIn( __METHOD__ );
@@ -600,7 +600,7 @@
* Note: results may be stale if the queue is concurrently modified.
*
* @return Iterator
- * @throws MWException
+ * @throws JobQueueError
*/
abstract public function getAllQueuedJobs();
@@ -609,7 +609,7 @@
* Note: results may be stale if the queue is concurrently modified.
*
* @return Iterator
- * @throws MWException
+ * @throws JobQueueError
* @since 1.22
*/
public function getAllDelayedJobs() {
@@ -640,3 +640,10 @@
throw new MWException( "Queue namespacing not supported for
this queue type." );
}
}
+
+/**
+ * @ingroup JobQueue
+ * @since 1.22
+ */
+class JobQueueError extends MWException {}
+class JobQueueConnectionError extends JobQueueError {}
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
index 56da4f3..3fa0655 100644
--- a/includes/job/JobQueueDB.php
+++ b/includes/job/JobQueueDB.php
@@ -80,9 +80,13 @@
}
list( $dbr, $scope ) = $this->getSlaveDB();
- $found = $dbr->selectField( // unclaimed job
- 'job', '1', array( 'job_cmd' => $this->type,
'job_token' => '' ), __METHOD__
- );
+ try {
+ $found = $dbr->selectField( // unclaimed job
+ 'job', '1', array( 'job_cmd' => $this->type,
'job_token' => '' ), __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
$this->cache->add( $key, $found ? 'false' : 'true',
self::CACHE_TTL_LONG );
return !$found;
@@ -100,11 +104,15 @@
return $size;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
- $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array( 'job_cmd' => $this->type, 'job_token' => '' ),
- __METHOD__
- );
+ try {
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, 'job_token' =>
'' ),
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
$this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
return $size;
@@ -127,10 +135,14 @@
}
list( $dbr, $scope ) = $this->getSlaveDB();
- $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array( 'job_cmd' => $this->type, "job_token !=
{$dbr->addQuotes( '' )}" ),
- __METHOD__
- );
+ try {
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, "job_token !=
{$dbr->addQuotes( '' )}" ),
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
return $count;
@@ -156,14 +168,18 @@
}
list( $dbr, $scope ) = $this->getSlaveDB();
- $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array(
- 'job_cmd' => $this->type,
- "job_token != {$dbr->addQuotes( '' )}",
- "job_attempts >= " . $dbr->addQuotes(
$this->maxTries )
- ),
- __METHOD__
- );
+ try {
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbr->addQuotes( '' )}",
+ "job_attempts >= " . $dbr->addQuotes(
$this->maxTries )
+ ),
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
$wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
return $count;
@@ -269,43 +285,47 @@
}
list( $dbw, $scope ) = $this->getMasterDB();
- $dbw->commit( __METHOD__, 'flush' ); // flush existing
transaction
- $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
- $dbw->clearFlag( DBO_TRX ); // make each query its own
transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw,
$autoTrx ) {
- $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old
setting
- } );
+ try {
+ $dbw->commit( __METHOD__, 'flush' ); // flush existing
transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current
setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own
transaction
+ $scopedReset = new ScopedCallback( function() use (
$dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); //
restore old setting
+ } );
- $uuid = wfRandomString( 32 ); // pop attempt
- $job = false; // job popped off
- do { // retry when our row is invalid or deleted as a duplicate
- // Try to reserve a row in the DB...
- if ( in_array( $this->order, array( 'fifo', 'timestamp'
) ) ) {
- $row = $this->claimOldest( $uuid );
- } else { // random first
- $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); //
encourage concurrent UPDATEs
- $gte = (bool)mt_rand( 0, 1 ); // find rows with
rand before/after $rand
- $row = $this->claimRandom( $uuid, $rand, $gte );
- }
- // Check if we found a row to reserve...
- if ( !$row ) {
- $this->cache->set( $this->getCacheKey( 'empty'
), 'true', self::CACHE_TTL_LONG );
- break; // nothing to do
- }
- JobQueue::incrStats( 'job-pop', $this->type );
- // Get the job object from the row...
- $title = Title::makeTitleSafe( $row->job_namespace,
$row->job_title );
- if ( !$title ) {
- $dbw->delete( 'job', array( 'job_id' =>
$row->job_id ), __METHOD__ );
- wfDebug( "Row has invalid title
'{$row->job_title}'." );
- continue; // try again
- }
- $job = Job::factory( $row->job_cmd, $title,
- self::extractBlob( $row->job_params ),
$row->job_id );
- $job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken
subclasses
- break; // done
- } while ( true );
+ $uuid = wfRandomString( 32 ); // pop attempt
+ $job = false; // job popped off
+ do { // retry when our row is invalid or deleted as a
duplicate
+ // Try to reserve a row in the DB...
+ if ( in_array( $this->order, array( 'fifo',
'timestamp' ) ) ) {
+ $row = $this->claimOldest( $uuid );
+ } else { // random first
+ $rand = mt_rand( 0,
self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+ $gte = (bool)mt_rand( 0, 1 ); // find
rows with rand before/after $rand
+ $row = $this->claimRandom( $uuid,
$rand, $gte );
+ }
+ // Check if we found a row to reserve...
+ if ( !$row ) {
+ $this->cache->set( $this->getCacheKey(
'empty' ), 'true', self::CACHE_TTL_LONG );
+ break; // nothing to do
+ }
+ JobQueue::incrStats( 'job-pop', $this->type );
+ // Get the job object from the row...
+ $title = Title::makeTitleSafe(
$row->job_namespace, $row->job_title );
+ if ( !$title ) {
+ $dbw->delete( 'job', array( 'job_id' =>
$row->job_id ), __METHOD__ );
+ wfDebug( "Row has invalid title
'{$row->job_title}'." );
+ continue; // try again
+ }
+ $job = Job::factory( $row->job_cmd, $title,
+ self::extractBlob( $row->job_params ),
$row->job_id );
+ $job->metadata['id'] = $row->job_id;
+ $job->id = $row->job_id; // XXX: work around
broken subclasses
+ break; // done
+ } while ( true );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
return $job;
}
@@ -461,16 +481,20 @@
}
list( $dbw, $scope ) = $this->getMasterDB();
- $dbw->commit( __METHOD__, 'flush' ); // flush existing
transaction
- $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
- $dbw->clearFlag( DBO_TRX ); // make each query its own
transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw,
$autoTrx ) {
- $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old
setting
- } );
+ try {
+ $dbw->commit( __METHOD__, 'flush' ); // flush existing
transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current
setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own
transaction
+ $scopedReset = new ScopedCallback( function() use (
$dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); //
restore old setting
+ } );
- // Delete a row with a single DELETE without holding row locks
over RTTs...
- $dbw->delete( 'job',
- array( 'job_cmd' => $this->type, 'job_id' =>
$job->metadata['id'] ), __METHOD__ );
+ // Delete a row with a single DELETE without holding
row locks over RTTs...
+ $dbw->delete( 'job',
+ array( 'job_cmd' => $this->type, 'job_id' =>
$job->metadata['id'] ), __METHOD__ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
return true;
}
@@ -516,7 +540,11 @@
protected function doDelete() {
list( $dbw, $scope ) = $this->getMasterDB();
- $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+ try {
+ $dbw->delete( 'job', array( 'job_cmd' => $this->type )
);
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
return true;
}
@@ -555,20 +583,25 @@
*/
public function getAllQueuedJobs() {
list( $dbr, $scope ) = $this->getSlaveDB();
- return new MappedIterator(
- $dbr->select( 'job', '*', array( 'job_cmd' =>
$this->getType(), 'job_token' => '' ) ),
- function( $row ) use ( $scope ) {
- $job = Job::factory(
- $row->job_cmd,
- Title::makeTitle( $row->job_namespace,
$row->job_title ),
- strlen( $row->job_params ) ?
unserialize( $row->job_params ) : false,
- $row->job_id
- );
- $job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around
broken subclasses
- return $job;
- }
- );
+ try {
+ return new MappedIterator(
+ $dbr->select( 'job', '*',
+ array( 'job_cmd' => $this->getType(),
'job_token' => '' ) ),
+ function( $row ) use ( $scope ) {
+ $job = Job::factory(
+ $row->job_cmd,
+ Title::makeTitle(
$row->job_namespace, $row->job_title ),
+ strlen( $row->job_params ) ?
unserialize( $row->job_params ) : false,
+ $row->job_id
+ );
+ $job->metadata['id'] = $row->job_id;
+ $job->id = $row->job_id; // XXX: work
around broken subclasses
+ return $job;
+ }
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
}
/**
@@ -578,75 +611,79 @@
*/
public function recycleAndDeleteStaleJobs() {
$now = time();
- list( $dbw, $scope ) = $this->getMasterDB();
$count = 0; // affected rows
+ list( $dbw, $scope ) = $this->getMasterDB();
- if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__,
1 ) ) {
- return $count; // already in progress
- }
+ try {
+ if ( !$dbw->lock( "jobqueue-recycle-{$this->type}",
__METHOD__, 1 ) ) {
+ return $count; // already in progress
+ }
- // Remove claims on jobs acquired for too long if enabled...
- if ( $this->claimTTL > 0 ) {
- $claimCutoff = $dbw->timestamp( $now - $this->claimTTL
);
- // Get the IDs of jobs that have be claimed but not
finished after too long.
- // These jobs can be recycled into the queue by
expiring the claim. Selecting
- // the IDs first means that the UPDATE can be done by
primary key (less deadlocks).
- $res = $dbw->select( 'job', 'job_id',
- array(
- 'job_cmd' => $this->type,
- "job_token != {$dbw->addQuotes( '' )}",
// was acquired
- "job_token_timestamp <
{$dbw->addQuotes( $claimCutoff )}", // stale
- "job_attempts < {$dbw->addQuotes(
$this->maxTries )}" ), // retries left
- __METHOD__
+ // Remove claims on jobs acquired for too long if
enabled...
+ if ( $this->claimTTL > 0 ) {
+ $claimCutoff = $dbw->timestamp( $now -
$this->claimTTL );
+ // Get the IDs of jobs that have be claimed but
not finished after too long.
+ // These jobs can be recycled into the queue by
expiring the claim. Selecting
+ // the IDs first means that the UPDATE can be
done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id',
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes(
'' )}", // was acquired
+ "job_token_timestamp <
{$dbw->addQuotes( $claimCutoff )}", // stale
+ "job_attempts <
{$dbw->addQuotes( $this->maxTries )}" ), // retries left
+ __METHOD__
+ );
+ $ids = array_map(
+ function( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
+ if ( count( $ids ) ) {
+ // Reset job_token for these jobs so
that other runners will pick them up.
+ // Set the timestamp to the current
time, as it is useful to now that the job
+ // was already tried before (the
timestamp becomes the "released" time).
+ $dbw->update( 'job',
+ array(
+ 'job_token' => '',
+ 'job_token_timestamp'
=> $dbw->timestamp( $now ) ), // time of release
+ array(
+ 'job_id' => $ids ),
+ __METHOD__
+ );
+ $count += $dbw->affectedRows();
+ JobQueue::incrStats( 'job-recycle',
$this->type, $dbw->affectedRows() );
+ $this->cache->set( $this->getCacheKey(
'empty' ), 'false', self::CACHE_TTL_LONG );
+ }
+ }
+
+ // Just destroy any stale jobs...
+ $pruneCutoff = $dbw->timestamp( $now -
self::MAX_AGE_PRUNE );
+ $conds = array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was
acquired
+ "job_token_timestamp < {$dbw->addQuotes(
$pruneCutoff )}" // stale
);
+ if ( $this->claimTTL > 0 ) { // only prune jobs
attempted too many times...
+ $conds[] = "job_attempts >= {$dbw->addQuotes(
$this->maxTries )}";
+ }
+ // Get the IDs of jobs that are considered stale and
should be removed. Selecting
+ // the IDs first means that the UPDATE can be done by
primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id', $conds,
__METHOD__ );
$ids = array_map(
function( $o ) {
return $o->job_id;
}, iterator_to_array( $res )
);
if ( count( $ids ) ) {
- // Reset job_token for these jobs so that other
runners will pick them up.
- // Set the timestamp to the current time, as it
is useful to now that the job
- // was already tried before (the timestamp
becomes the "released" time).
- $dbw->update( 'job',
- array(
- 'job_token' => '',
- 'job_token_timestamp' =>
$dbw->timestamp( $now ) ), // time of release
- array(
- 'job_id' => $ids ),
- __METHOD__
- );
+ $dbw->delete( 'job', array( 'job_id' => $ids ),
__METHOD__ );
$count += $dbw->affectedRows();
- JobQueue::incrStats( 'job-recycle',
$this->type, $dbw->affectedRows() );
- $this->cache->set( $this->getCacheKey( 'empty'
), 'false', self::CACHE_TTL_LONG );
+ JobQueue::incrStats( 'job-abandon',
$this->type, $dbw->affectedRows() );
}
- }
- // Just destroy any stale jobs...
- $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
- $conds = array(
- 'job_cmd' => $this->type,
- "job_token != {$dbw->addQuotes( '' )}", // was acquired
- "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff
)}" // stale
- );
- if ( $this->claimTTL > 0 ) { // only prune jobs attempted too
many times...
- $conds[] = "job_attempts >= {$dbw->addQuotes(
$this->maxTries )}";
+ $dbw->unlock( "jobqueue-recycle-{$this->type}",
__METHOD__ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
}
- // Get the IDs of jobs that are considered stale and should be
removed. Selecting
- // the IDs first means that the UPDATE can be done by primary
key (less deadlocks).
- $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
- $ids = array_map(
- function( $o ) {
- return $o->job_id;
- }, iterator_to_array( $res )
- );
- if ( count( $ids ) ) {
- $dbw->delete( 'job', array( 'job_id' => $ids ),
__METHOD__ );
- $count += $dbw->affectedRows();
- JobQueue::incrStats( 'job-abandon', $this->type,
$dbw->affectedRows() );
- }
-
- $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
return $count;
}
@@ -655,14 +692,22 @@
* @return Array (DatabaseBase, ScopedCallback)
*/
protected function getSlaveDB() {
- return $this->getDB( DB_SLAVE );
+ try {
+ return $this->getDB( DB_SLAVE );
+ } catch ( DBConnectionError $e ) {
+ throw new JobQueueConnectionError( "DBConnectionError:"
. $e->getMessage() );
+ }
}
/**
* @return Array (DatabaseBase, ScopedCallback)
*/
protected function getMasterDB() {
- return $this->getDB( DB_MASTER );
+ try {
+ return $this->getDB( DB_MASTER );
+ } catch ( DBConnectionError $e ) {
+ throw new JobQueueConnectionError( "DBConnectionError:"
. $e->getMessage() );
+ }
}
/**
@@ -737,4 +782,12 @@
return false;
}
}
+
+ /**
+ * @param DBError $e
+ * @throws JobQueueError
+ */
+ protected function throwDBException( DBError $e ) {
+ throw new JobQueueError( get_class( $e ) . ": " .
$e->getMessage() );
+ }
}
diff --git a/includes/job/JobQueueFederated.php
b/includes/job/JobQueueFederated.php
index 19de8bb..35b80ca 100644
--- a/includes/job/JobQueueFederated.php
+++ b/includes/job/JobQueueFederated.php
@@ -138,9 +138,13 @@
}
foreach ( $this->partitionQueues as $queue ) {
- if ( !$queue->doIsEmpty() ) {
- $this->cache->add( $key, 'false',
self::CACHE_TTL_LONG );
- return false;
+ try {
+ if ( !$queue->doIsEmpty() ) {
+ $this->cache->add( $key, 'false',
self::CACHE_TTL_LONG );
+ return false;
+ }
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
}
}
@@ -179,7 +183,11 @@
$count = 0;
foreach ( $this->partitionQueues as $queue ) {
- $count += $queue->$method();
+ try {
+ $count += $queue->$method();
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
}
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -244,7 +252,13 @@
// Insert the de-duplicated jobs into the queues...
foreach ( $uJobsByPartition as $partition => $jobBatch ) {
$queue = $this->partitionQueues[$partition];
- if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+ try {
+ $ok = $queue->doBatchPush( $jobBatch, $flags );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
+ if ( $ok ) {
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false',
JobQueueDB::CACHE_TTL_LONG );
} else {
@@ -259,7 +273,13 @@
$jobsLeft = array_merge( $jobsLeft, $jobBatch
); // not inserted
} else {
$queue = $this->partitionQueues[$partition];
- if ( $queue->doBatchPush( $jobBatch, $flags ) )
{
+ try {
+ $ok = $queue->doBatchPush( $jobBatch,
$flags );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ wfDebugLog( 'exception',
$e->getLogMessage() );
+ }
+ if ( $ok ) {
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false',
JobQueueDB::CACHE_TTL_LONG );
} else {
@@ -288,7 +308,12 @@
break; // all partitions at 0 weight
}
$queue = $this->partitionQueues[$partition];
- $job = $queue->pop();
+ try {
+ $job = $queue->pop();
+ } catch ( JobQueueError $e ) {
+ $job = false;
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
if ( $job ) {
$job->metadata['QueuePartition'] = $partition;
return $job;
@@ -336,13 +361,21 @@
protected function doDelete() {
foreach ( $this->partitionQueues as $queue ) {
- $queue->doDelete();
+ try {
+ $queue->doDelete();
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
}
}
protected function doWaitForBackups() {
foreach ( $this->partitionQueues as $queue ) {
- $queue->waitForBackups();
+ try {
+ $queue->waitForBackups();
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
}
}
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
index 85f99b7..e483e05 100644
--- a/includes/job/JobQueueGroup.php
+++ b/includes/job/JobQueueGroup.php
@@ -310,9 +310,13 @@
} elseif ( !isset( $lastRuns[$type][$task] )
|| $lastRuns[$type][$task] < ( time() -
$definition['period'] ) )
{
- if ( call_user_func(
$definition['callback'] ) !== null ) {
- $tasksRun[$type][$task] =
time();
- ++$count;
+ try {
+ if ( call_user_func(
$definition['callback'] ) !== null ) {
+ $tasksRun[$type][$task]
= time();
+ ++$count;
+ }
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception',
$e->getLogMessage() );
}
}
}
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 939fa42..57189a5 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -786,7 +786,7 @@
protected function getConnection() {
$conn = $this->redisPool->getConnection( $this->server );
if ( !$conn ) {
- throw new MWException( "Unable to connect to redis
server." );
+ throw new JobQueueConnectionError( "Unable to connect
to redis server." );
}
return $conn;
}
@@ -799,7 +799,7 @@
*/
protected function throwRedisException( $server, RedisConnRef $conn, $e
) {
$this->redisPool->handleException( $server, $conn, $e );
- throw new MWException( "Redis server error:
{$e->getMessage()}\n" );
+ throw new JobQueueError( "Redis server error:
{$e->getMessage()}\n" );
}
/**
--
To view, visit https://gerrit.wikimedia.org/r/69451
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I9d9f0dae548a9dde693a7cd25c255a8bfbf37899
Gerrit-PatchSet: 4
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Tim Starling <[email protected]>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits