Ori.livneh has submitted this change and it was merged.
Change subject: Add per-partition JobQueueRedis aggregation
......................................................................
Add per-partition JobQueueRedis aggregation
* Track queues with non-abandoned jobs per partition server.
The s-queuesWithJobs key can easily be queried to see which
queues need to have periodic tasks run (or for debugging).
* This is requirement for the redis jobchron service to be able to
avoid hitting N=(no. types X no. wikis) queues for periodic tasks
when only a tiny fraction of those actually have any jobs. For WMF,
there are over 30K queues, most of them empty, so doing that can help
lower redis-server CPU (or at least make jobchron more responsive).
* This also allows for jobchron to manage the aggregator by taking the
per-server aggregator sets and merging them. This scales much better
as there are only a modest number of these daemons (18 for WMF) but
vastly more web thread pushing jobs. This cuts down on the connections
to the active aggregator server (the one with the hash table).
* Use Lua unpack() more for stylistic consistency.
Change-Id: I1549f0edc78cc4004dd887b475dec4c0ebd306c6
(cherry picked from commit 6fe2f48df70e63cc0af1e2c100d2a5b10a6c6f71)
---
M includes/jobqueue/JobQueueRedis.php
M tests/phpunit/includes/jobqueue/JobQueueTest.php
2 files changed, 111 insertions(+), 12 deletions(-)
Approvals:
Ori.livneh: Verified; Looks good to me, approved
diff --git a/includes/jobqueue/JobQueueRedis.php
b/includes/jobqueue/JobQueueRedis.php
index 78d2a36..9ce9bf9 100644
--- a/includes/jobqueue/JobQueueRedis.php
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -26,8 +26,9 @@
*
* This is a faster and less resource-intensive job queue than JobQueueDB.
* All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and
running.
*
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
* - l-unclaimed : A list of job IDs used for ready unclaimed jobs
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used
for job retries
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used
for broken jobs
@@ -42,6 +43,12 @@
* there should be no other such jobs with that SHA1. Every h-idBySha1 entry
has an h-sha1ById
* entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a
job has its
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry
for its ID.
+ *
+ * The following keys are used to track queue states:
+ * - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning
jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
*
* Additionally, "rootjob:* keys track "root jobs" used for additional
de-duplication.
* Aside from root job keys, all keys have no expiry, and are only removed
when jobs are run.
@@ -239,7 +246,8 @@
* @throws RedisException
*/
protected function pushBlobs( RedisConnRef $conn, array $items ) {
- $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime,
blob ... ] ] )
+ $args = array( $this->encodeQueueName() );
+ // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1,
rtime, blob ... ] ] )
foreach ( $items as $item ) {
$args[] = (string)$item['uuid'];
$args[] = (string)$item['sha1'];
@@ -248,10 +256,17 @@
}
static $script =
<<<LUA
- local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData =
unpack(KEYS)
- if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched
arguments') end
+ local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData,
kQwJobs = unpack(KEYS)
+ -- First argument is the queue ID
+ local queueId = ARGV[1]
+ -- Next arguments all come in 4s (one per job)
+ local variadicArgCount = #ARGV - 1
+ if variadicArgCount % 4 ~= 0 then
+ return redis.error_reply('Unmatched arguments')
+ end
+ -- Insert each job into this queue as needed
local pushed = 0
- for i = 1,#ARGV,4 do
+ for i = 2,#ARGV,4 do
local id,sha1,rtimestamp,blob =
ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
if sha1 == '' or redis.call('hExists',kIdBySha1,sha1)
== 0 then
if 1*rtimestamp > 0 then
@@ -269,6 +284,8 @@
pushed = pushed + 1
end
end
+ -- Mark this queue as having jobs
+ redis.call('sAdd',kQwJobs,queueId)
return pushed
LUA;
return $conn->luaEval( $script,
@@ -279,10 +296,11 @@
$this->getQueueKey( 'h-idBySha1' ), #
KEYS[3]
$this->getQueueKey( 'z-delayed' ), #
KEYS[4]
$this->getQueueKey( 'h-data' ), #
KEYS[5]
+ $this->getGlobalKey( 's-queuesWithJobs'
), # KEYS[6]
),
$args
),
- 5 # number of first argument(s) that are keys
+ 6 # number of first argument(s) that are keys
);
}
@@ -328,15 +346,18 @@
static $script =
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts,
kData = unpack(KEYS)
+ local rTime = unpack(ARGV)
-- Pop an item off the queue
local id = redis.call('rPop',kUnclaimed)
- if not id then return false end
+ if not id then
+ return false
+ end
-- Allow new duplicates of this job
local sha1 = redis.call('hGet',kSha1ById,id)
if sha1 then redis.call('hDel',kIdBySha1,sha1) end
redis.call('hDel',kSha1ById,id)
-- Mark the jobs as claimed and return it
- redis.call('zAdd',kClaimed,ARGV[1],id)
+ redis.call('zAdd',kClaimed,rTime,id)
redis.call('hIncrBy',kAttempts,id,1)
return redis.call('hGet',kData,id)
LUA;
@@ -372,11 +393,12 @@
static $script =
<<<LUA
local kClaimed, kAttempts, kData = unpack(KEYS)
+ local uuid = unpack(ARGV)
-- Unmark the job as claimed
- redis.call('zRem',kClaimed,ARGV[1])
- redis.call('hDel',kAttempts,ARGV[1])
+ redis.call('zRem',kClaimed,uuid)
+ redis.call('hDel',kAttempts,uuid)
-- Delete the job data itself
- return redis.call('hDel',kData,ARGV[1])
+ return redis.call('hDel',kData,uuid)
LUA;
$res = $conn->luaEval( $script,
array(
@@ -472,7 +494,10 @@
$keys[] = $this->getQueueKey( $prop );
}
- return ( $conn->delete( $keys ) !== false );
+ $ok = ( $conn->delete( $keys ) !== false );
+ $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ),
$this->encodeQueueName() );
+
+ return $ok;
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
@@ -624,6 +649,27 @@
}
/**
+ * @return array List of (wiki,type) tuples for queues with
non-abandoned jobs
+ * @throws JobQueueConnectionError
+ * @throws JobQueueError
+ */
+ public function getServerQueuesWithJobs() {
+ $queues = array();
+
+ $conn = $this->getConnection();
+ try {
+ $set = $conn->sMembers( $this->getGlobalKey(
's-queuesWithJobs' ) );
+ foreach ( $set as $queue ) {
+ $queues[] = $this->decodeQueueName( $queue );
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $queues;
+ }
+
+ /**
* @param IJobSpecification $job
* @return array
*/
@@ -720,6 +766,40 @@
}
/**
+ * @return string JSON
+ */
+ private function encodeQueueName() {
+ return json_encode( array( $this->type, $this->wiki ) );
+ }
+
+ /**
+ * @param string $name JSON
+ * @return array (type, wiki)
+ */
+ private function decodeQueueName( $name ) {
+ return json_decode( $name );
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ */
+ private function getGlobalKey( $name ) {
+ $parts = array( 'global', 'jobqueue', $name );
+ if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+ $parts[] = $this->key;
+ }
+
+ foreach ( $parts as $part ) {
+ if ( !preg_match( '/[a-zA-Z0-9_-.]+/', $part ) ) {
+ throw new InvalidArgumentException( "Key part
characters are out of range." );
+ }
+ }
+
+ return implode( ':', $parts );
+ }
+
+ /**
* @param string $prop
* @param string|null $type
* @return string
diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php
b/tests/phpunit/includes/jobqueue/JobQueueTest.php
index 3cb1af6..bb74e95 100644
--- a/tests/phpunit/includes/jobqueue/JobQueueTest.php
+++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php
@@ -332,6 +332,25 @@
$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs
active ($desc)" );
}
+ /**
+ * @covers JobQueue
+ */
+ public function testQueueAggregateTable() {
+ $queue = $this->queueFifo;
+ if ( !$queue || !method_exists( $queue,
'getServerQueuesWithJobs' ) ) {
+ $this->markTestSkipped();
+ }
+
+ $this->assertArrayEquals( array(),
$queue->getServerQueuesWithJobs() );
+
+ $queue->push( $this->newJob( 0 ) );
+
+ $this->assertArrayEquals(
+ array( array( $queue->getType(), $queue->getWiki() ) ),
+ $queue->getServerQueuesWithJobs()
+ );
+ }
+
public static function provider_queueLists() {
return array(
array( 'queueRand', false, 'Random queue without ack()'
),
--
To view, visit https://gerrit.wikimedia.org/r/258392
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I1549f0edc78cc4004dd887b475dec4c0ebd306c6
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: wmf/1.27.0-wmf.8
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits