Aaron Schulz has uploaded a new change for review.
https://gerrit.wikimedia.org/r/104475
Change subject: Merged redis queue periodic tasks into
recyclePruneAndUndelayJobs()
......................................................................
Merged redis queue periodic tasks into recyclePruneAndUndelayJobs()
* This avoids a few extra round trips for queues with delayed jobs
Change-Id: I457512360a445c234cfeba7a716eedeb37273467
---
M includes/job/JobQueueRedis.php
1 file changed, 32 insertions(+), 64 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core
refs/changes/75/104475/1
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 9b9fe2d..80b5f4e 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -300,7 +300,7 @@
// Push ready delayed jobs into the queue every 10 jobs to
spread the load.
// This is also done as a periodic task, but we don't want too
much done at once.
if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
- $this->releaseReadyDelayedJobs();
+ $this->recyclePruneAndUndelayJobs();
}
$conn = $this->getConnection();
@@ -309,7 +309,7 @@
if ( $this->claimTTL > 0 ) {
// Keep the claimed job list down for
high-traffic queues
if ( mt_rand( 0, 99 ) == 0 ) {
-
$this->recycleAndDeleteStaleJobs();
+
$this->recyclePruneAndUndelayJobs();
}
$blob = $this->popAndAcquireBlob( $conn
);
} else {
@@ -326,7 +326,7 @@
continue;
}
- // If $item is invalid,
recycleAndDeleteStaleJobs() will cleanup as needed
+ // If $item is invalid,
recyclePruneAndUndelayJobs() will cleanup as needed
$job = $this->getJobFromFields( $item ); // may
be false
} while ( !$job ); // job may be false if invalid
} catch ( RedisException $e ) {
@@ -628,53 +628,13 @@
}
/**
- * Release any ready delayed jobs into the queue
- *
- * @return int Number of jobs released
- * @throws JobQueueError
- */
- public function releaseReadyDelayedJobs() {
- $count = 0;
-
- $conn = $this->getConnection();
- try {
- static $script =
-<<<LUA
- local kDelayed, kUnclaimed = unpack(KEYS)
- -- Get the list of ready delayed jobs, sorted by
readiness
- local ids =
redis.call('zRangeByScore',kDelayed,0,ARGV[1])
- -- Migrate the jobs from the "delayed" set to the
"unclaimed" list
- for k,id in ipairs(ids) do
- redis.call('lPush',kUnclaimed,id)
- redis.call('zRem',kDelayed,id)
- end
- return #ids
-LUA;
- $count += (int)$conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-delayed' ), //
KEYS[1]
- $this->getQueueKey( 'l-unclaimed' ), //
KEYS[2]
- time() // ARGV[1]; max "delay until"
UNIX timestamp
- ),
- 2 # first two arguments are keys
- );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return $count;
- }
-
- /**
* Recycle or destroy any jobs that have been claimed for too long
+ * and release any ready delayed jobs into the queue
*
- * @return int Number of jobs recycled/deleted
+ * @return int Number of jobs recycled/deleted/undelayed
* @throws MWException|JobQueueError
*/
- public function recycleAndDeleteStaleJobs() {
- if ( $this->claimTTL <= 0 ) { // sanity
- throw new MWException( "Cannot recycle jobs since
acknowledgements are disabled." );
- }
+ public function recyclePruneAndUndelayJobs() {
$count = 0;
// For each job item that can be retried, we need to add it
back to the
// main queue and remove it from the list of currenty claimed
job items.
@@ -685,8 +645,8 @@
$now = time();
static $script =
<<<LUA
- local kClaimed, kAttempts, kUnclaimed, kData,
kAbandoned = unpack(KEYS)
- local released,abandoned,pruned = 0,0,0
+ local kClaimed, kAttempts, kUnclaimed, kData,
kAbandoned, kDelayed = unpack(KEYS)
+ local released,abandoned,pruned,undelayed = 0,0,0,0
-- Get all non-dead jobs that have an expired claim on
them.
-- The score for each item is the last claim timestamp
(UNIX).
local staleClaims =
redis.call('zRangeByScore',kClaimed,0,ARGV[1])
@@ -715,7 +675,15 @@
redis.call('hDel',kData,id)
pruned = pruned + 1
end
- return {released,abandoned,pruned}
+ -- Get the list of ready delayed jobs, sorted by
readiness (UNIX timestamp)
+ local ids =
redis.call('zRangeByScore',kDelayed,0,ARGV[4])
+ -- Migrate the jobs from the "delayed" set to the
"unclaimed" list
+ for k,id in ipairs(ids) do
+ redis.call('lPush',kUnclaimed,id)
+ redis.call('zRem',kDelayed,id)
+ end
+ undelayed = #ids
+ return {released,abandoned,pruned,undelayed}
LUA;
$res = $conn->luaEval( $script,
array(
@@ -724,15 +692,17 @@
$this->getQueueKey( 'l-unclaimed' ), #
KEYS[3]
$this->getQueueKey( 'h-data' ), #
KEYS[4]
$this->getQueueKey( 'z-abandoned' ), #
KEYS[5]
+ $this->getQueueKey( 'z-delayed' ), #
KEYS[6]
$now - $this->claimTTL, # ARGV[1]
$now - self::MAX_AGE_PRUNE, # ARGV[2]
- $this->maxTries # ARGV[3]
+ $this->maxTries, # ARGV[3]
+ $now # ARGV[4]
),
- 5 # number of first argument(s) that are keys
+ 6 # number of first argument(s) that are keys
);
if ( $res ) {
- list( $released, $abandoned, $pruned ) = $res;
- $count += $released + $pruned;
+ list( $released, $abandoned, $pruned,
$undelayed ) = $res;
+ $count += $released + $pruned + $undelayed;
JobQueue::incrStats( 'job-recycle',
$this->type, $released );
JobQueue::incrStats( 'job-abandon',
$this->type, $abandoned );
}
@@ -747,21 +717,19 @@
* @return array
*/
protected function doGetPeriodicTasks() {
- $tasks = array();
+ $periods = array( 3600 ); // standard cleanup (useful on config
change)
if ( $this->claimTTL > 0 ) {
- $tasks['recycleAndDeleteStaleJobs'] = array(
- 'callback' => array( $this,
'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- );
+ $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad
timing
}
if ( $this->checkDelay ) {
- $tasks['releaseReadyDelayedJobs'] = array(
- 'callback' => array( $this,
'releaseReadyDelayedJobs' ),
- 'period' => 300 // 5 minutes
- );
+ $periods[] = 300; // 5 minutes
}
-
- return $tasks;
+ return array(
+ 'recyclePruneAndUndelayJobs' => array(
+ 'callback' => array( $this,
'recyclePruneAndUndelayJobs' ),
+ 'period' => max( min( $periods ), 30 ) //
sanity
+ )
+ );
}
/**
--
To view, visit https://gerrit.wikimedia.org/r/104475
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I457512360a445c234cfeba7a716eedeb37273467
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits