jenkins-bot has submitted this change and it was merged.

Change subject: Add per-partition JobQueueRedis aggregation
......................................................................


Add per-partition JobQueueRedis aggregation

* Track queues with non-abandoned jobs per partition server.
  The s-queuesWithJobs key can easily be queried to see which
  queues need to have periodic tasks run (or for debugging).
* This is requirement for the redis jobchron service to be able to
  avoid hitting N=(no. types X no. wikis) queues for periodic tasks
  when only a tiny fraction of those actually have any jobs. For WMF,
  there are over 30K queues, most of them empty, so doing that can help
  lower redis-server CPU (or at least make jobchron more responsive).
* This also allows for jobchron to manage the aggregator by taking the
  per-server aggregator sets and merging them. This scales much better
  as there are only a modest number of these daemons (18 for WMF) but
  vastly more web thread pushing jobs. This cuts down on the connections
  to the active aggregator server (the one with the hash table).
* Use Lua unpack() more for stylistic consistency.

Change-Id: I1549f0edc78cc4004dd887b475dec4c0ebd306c6
---
M includes/jobqueue/JobQueueRedis.php
M tests/phpunit/includes/jobqueue/JobQueueTest.php
2 files changed, 111 insertions(+), 12 deletions(-)

Approvals:
  Ori.livneh: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/includes/jobqueue/JobQueueRedis.php 
b/includes/jobqueue/JobQueueRedis.php
index 78d2a36..9ce9bf9 100644
--- a/includes/jobqueue/JobQueueRedis.php
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -26,8 +26,9 @@
  *
  * This is a faster and less resource-intensive job queue than JobQueueDB.
  * All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and 
running.
  *
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
  *   - l-unclaimed  : A list of job IDs used for ready unclaimed jobs
  *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used 
for job retries
  *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used 
for broken jobs
@@ -42,6 +43,12 @@
  * there should be no other such jobs with that SHA1. Every h-idBySha1 entry 
has an h-sha1ById
  * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a 
job has its
  * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry 
for its ID.
+ *
+ * The following keys are used to track queue states:
+ *   - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning 
jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
  *
  * Additionally, "rootjob:* keys track "root jobs" used for additional 
de-duplication.
  * Aside from root job keys, all keys have no expiry, and are only removed 
when jobs are run.
@@ -239,7 +246,8 @@
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
-               $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, 
blob ... ] ] )
+               $args = array( $this->encodeQueueName() );
+               // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, 
rtime, blob ... ] ] )
                foreach ( $items as $item ) {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
@@ -248,10 +256,17 @@
                }
                static $script =
 <<<LUA
-               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = 
unpack(KEYS)
-               if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched 
arguments') end
+               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, 
kQwJobs = unpack(KEYS)
+               -- First argument is the queue ID
+               local queueId = ARGV[1]
+               -- Next arguments all come in 4s (one per job)
+               local variadicArgCount = #ARGV - 1
+               if variadicArgCount % 4 ~= 0 then
+                       return redis.error_reply('Unmatched arguments')
+               end
+               -- Insert each job into this queue as needed
                local pushed = 0
-               for i = 1,#ARGV,4 do
+               for i = 2,#ARGV,4 do
                        local id,sha1,rtimestamp,blob = 
ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
                        if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) 
== 0 then
                                if 1*rtimestamp > 0 then
@@ -269,6 +284,8 @@
                                pushed = pushed + 1
                        end
                end
+               -- Mark this queue as having jobs
+               redis.call('sAdd',kQwJobs,queueId)
                return pushed
 LUA;
                return $conn->luaEval( $script,
@@ -279,10 +296,11 @@
                                        $this->getQueueKey( 'h-idBySha1' ), # 
KEYS[3]
                                        $this->getQueueKey( 'z-delayed' ), # 
KEYS[4]
                                        $this->getQueueKey( 'h-data' ), # 
KEYS[5]
+                                       $this->getGlobalKey( 's-queuesWithJobs' 
), # KEYS[6]
                                ),
                                $args
                        ),
-                       5 # number of first argument(s) that are keys
+                       6 # number of first argument(s) that are keys
                );
        }
 
@@ -328,15 +346,18 @@
                static $script =
 <<<LUA
                local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, 
kData = unpack(KEYS)
+               local rTime = unpack(ARGV)
                -- Pop an item off the queue
                local id = redis.call('rPop',kUnclaimed)
-               if not id then return false end
+               if not id then
+                       return false
+               end
                -- Allow new duplicates of this job
                local sha1 = redis.call('hGet',kSha1ById,id)
                if sha1 then redis.call('hDel',kIdBySha1,sha1) end
                redis.call('hDel',kSha1ById,id)
                -- Mark the jobs as claimed and return it
-               redis.call('zAdd',kClaimed,ARGV[1],id)
+               redis.call('zAdd',kClaimed,rTime,id)
                redis.call('hIncrBy',kAttempts,id,1)
                return redis.call('hGet',kData,id)
 LUA;
@@ -372,11 +393,12 @@
                        static $script =
 <<<LUA
                        local kClaimed, kAttempts, kData = unpack(KEYS)
+                       local uuid = unpack(ARGV)
                        -- Unmark the job as claimed
-                       redis.call('zRem',kClaimed,ARGV[1])
-                       redis.call('hDel',kAttempts,ARGV[1])
+                       redis.call('zRem',kClaimed,uuid)
+                       redis.call('hDel',kAttempts,uuid)
                        -- Delete the job data itself
-                       return redis.call('hDel',kData,ARGV[1])
+                       return redis.call('hDel',kData,uuid)
 LUA;
                        $res = $conn->luaEval( $script,
                                array(
@@ -472,7 +494,10 @@
                                $keys[] = $this->getQueueKey( $prop );
                        }
 
-                       return ( $conn->delete( $keys ) !== false );
+                       $ok = ( $conn->delete( $keys ) !== false );
+                       $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), 
$this->encodeQueueName() );
+
+                       return $ok;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -624,6 +649,27 @@
        }
 
        /**
+        * @return array List of (wiki,type) tuples for queues with 
non-abandoned jobs
+        * @throws JobQueueConnectionError
+        * @throws JobQueueError
+        */
+       public function getServerQueuesWithJobs() {
+               $queues = array();
+
+               $conn = $this->getConnection();
+               try {
+                       $set = $conn->sMembers( $this->getGlobalKey( 
's-queuesWithJobs' ) );
+                       foreach ( $set as $queue ) {
+                               $queues[] = $this->decodeQueueName( $queue );
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
+               }
+
+               return $queues;
+       }
+
+       /**
         * @param IJobSpecification $job
         * @return array
         */
@@ -720,6 +766,40 @@
        }
 
        /**
+        * @return string JSON
+        */
+       private function encodeQueueName() {
+               return json_encode( array( $this->type, $this->wiki ) );
+       }
+
+       /**
+        * @param string $name JSON
+        * @return array (type, wiki)
+        */
+       private function decodeQueueName( $name ) {
+               return json_decode( $name );
+       }
+
+       /**
+        * @param string $name
+        * @return string
+        */
+       private function getGlobalKey( $name ) {
+               $parts = array( 'global', 'jobqueue', $name );
+               if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+                       $parts[] = $this->key;
+               }
+
+               foreach ( $parts as $part ) {
+                   if ( !preg_match( '/[a-zA-Z0-9_-.]+/', $part ) ) {
+                       throw new InvalidArgumentException( "Key part 
characters are out of range." );
+                   }
+               }
+
+               return implode( ':', $parts );
+       }
+
+       /**
         * @param string $prop
         * @param string|null $type
         * @return string
diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php 
b/tests/phpunit/includes/jobqueue/JobQueueTest.php
index 3cb1af6..bb74e95 100644
--- a/tests/phpunit/includes/jobqueue/JobQueueTest.php
+++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php
@@ -332,6 +332,25 @@
                $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs 
active ($desc)" );
        }
 
+       /**
+        * @covers JobQueue
+        */
+       public function testQueueAggregateTable() {
+               $queue = $this->queueFifo;
+               if ( !$queue || !method_exists( $queue, 
'getServerQueuesWithJobs' ) ) {
+                       $this->markTestSkipped();
+               }
+
+               $this->assertArrayEquals( array(), 
$queue->getServerQueuesWithJobs() );
+
+               $queue->push( $this->newJob( 0 ) );
+
+               $this->assertArrayEquals(
+                       array( array( $queue->getType(), $queue->getWiki() ) ),
+                       $queue->getServerQueuesWithJobs()
+               );
+       }
+
        public static function provider_queueLists() {
                return array(
                        array( 'queueRand', false, 'Random queue without ack()' 
),

-- 
To view, visit https://gerrit.wikimedia.org/r/252608
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I1549f0edc78cc4004dd887b475dec4c0ebd306c6
Gerrit-PatchSet: 9
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to