Aaron Schulz has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/174214

Change subject: Batch executeReadyPeriodicTasks() updates to avoid OOMs
......................................................................

Batch executeReadyPeriodicTasks() updates to avoid OOMs

Change-Id: I70aaeddef972d370a8e9d81e60dec7ae0fda928c
---
M redisJobRunnerService
1 file changed, 103 insertions(+), 90 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner 
refs/changes/14/174214/1

diff --git a/redisJobRunnerService b/redisJobRunnerService
index 69ec126..3832e27 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -375,15 +375,18 @@
         * @return int|bool Number of jobs recycled/deleted/undelayed/abandoned 
(false if not run)
         */
        protected function executeReadyPeriodicTasks() {
-               $tasksRun = 0;
+               $jobs = 0;
 
-               // Run no more than every 5 minutes
+               // Run no more than every 5 minutes.
+               // This is randomized to scale the liveliness with the # of 
runners.
                static $lastPeriodicTime = null;
                $lastPeriodicTime = $lastPeriodicTime ?: time() - mt_rand( 0, 
300 );
                if ( ( time() - $lastPeriodicTime ) <= 300 ) {
                        return false;
                }
                $lastPeriodicTime = time();
+
+               $host = gethostname();
 
                $aConn = false;
                // Connect to the first working server on the list
@@ -394,11 +397,84 @@
                        }
                }
 
-               $host = gethostname();
-
                if ( !$aConn ) {
                        $this->incrStats( "periodictasks.failed.$host", 1 );
-                       return $tasksRun;
+                       return $jobs;
+               }
+
+               $ok = true;
+               try {
+                       $types = $this->redisCmd( $aConn, 'hKeys', array( 
$this->getQueueTypesKey() ) );
+                       $wikiIds = $this->redisCmd( $aConn, 'sMembers', array( 
$this->getWikiSetKey() ) );
+                       if ( !is_array( $types ) || !is_array( $wikiIds ) ) {
+                               $this->incrStats( "periodictasks.failed.$host", 
1 );
+                               return $jobs;
+                       }
+
+                       // Randomize to scale the liveliness with the # of 
runners
+                       shuffle( $types );
+                       shuffle( $wikiIds );
+
+                       $now = time();
+
+                       // Build up the script arguments for each queue...
+                       $paramsByQueue = array();
+                       foreach ( $types as $type ) {
+                               $ttl = isset( $this->claimTTLMap[$type] )
+                                       ? $this->claimTTLMap[$type]
+                                       : $this->claimTTLMap['*'];
+                               $attempts = isset( $this->attemptsMap[$type] )
+                                       ? $this->attemptsMap[$type]
+                                       : $this->attemptsMap['*'];
+                               foreach ( $wikiIds as $wikiId ) {
+                                       $paramsByQueue[] = array(
+                                               'queue'  => array( $type, 
$wikiId ),
+                                               'params' => array(
+                                                       
"{$wikiId}:jobqueue:{$type}:z-claimed", # KEYS[1]
+                                                       
"{$wikiId}:jobqueue:{$type}:h-attempts", # KEYS[2]
+                                                       
"{$wikiId}:jobqueue:{$type}:l-unclaimed", # KEYS[3]
+                                                       
"{$wikiId}:jobqueue:{$type}:h-data", # KEYS[4]
+                                                       
"{$wikiId}:jobqueue:{$type}:z-abandoned", # KEYS[5]
+                                                       
"{$wikiId}:jobqueue:{$type}:z-delayed", # KEYS[6]
+                                                       $now - $ttl, # ARGV[1]
+                                                       $now - 7 * 86400, # 
ARGV[2]
+                                                       $attempts, # ARGV[3]
+                                                       $now # ARGV[4]
+                                               ),
+                                               'keys'   => 6 # number of first 
argument(s) that are keys
+                                       );
+                               }
+                       }
+                       // Batch the Lua queries to avoid client OOMs
+                       $paramsByQueueBatches = array_chunk( $paramsByQueue, 
1000 );
+
+                       $mapSet = array(); // ready map of (queue name => 
timestamp)
+                       // Run the script on all job queue partitions...
+                       foreach ( $this->queueSrvs as $qServer ) {
+                               foreach ( $paramsByQueueBatches as 
$paramsByQueueBatch ) {
+                                       $ok = $this->updateQueueServer(
+                                               $qServer, $paramsByQueueBatch, 
$mapSet, $jobs ) && $ok;
+                               }
+                       }
+                       // Update the map in the aggregator as queues become 
ready
+                       $this->redisCmd( $aConn, 'hMSet', array( 
$this->getReadyQueueKey(), $mapSet ) );
+               } catch ( RedisException $e ) {
+                       $ok = false;
+                       $this->handleRedisError( $e, $aServer );
+               }
+
+               if ( !$ok ) {
+                       $this->error( "Failed to do periodic tasks for some 
queues." );
+                       $this->incrStats( "periodictasks.failed.$host", 1 );
+               }
+
+               return $jobs;
+       }
+
+       private function updateQueueServer( $qServer, array $paramsByQueue, 
array &$mapSet, &$jobs ) {
+               $qConn = $this->getRedisConn( $qServer );
+               if ( !$qConn ) {
+                       return false; // partition down
                }
 
                static $script =
@@ -447,102 +523,39 @@
 
                $ok = true;
                try {
-                       $types = $this->redisCmd( $aConn, 'hKeys', array( 
$this->getQueueTypesKey() ) );
-                       $wikiIds = $this->redisCmd( $aConn, 'sMembers', array( 
$this->getWikiSetKey() ) );
-                       if ( !is_array( $types ) || !is_array( $wikiIds ) ) {
-                               $this->incrStats( "periodictasks.failed.$host", 
1 );
-                               return $tasksRun;
-                       }
+                       $sha1 = $this->redisCmd( $qConn, 'script', array( 
'load', $script ) );
 
-                       shuffle( $types );
-                       shuffle( $wikiIds );
+                       $this->redisCmd( $qConn, 'multi', array( 
Redis::PIPELINE ) );
+                       foreach ( $paramsByQueue as $params ) {
+                               $this->redisCmd( $qConn, 'evalSha',
+                                       array( $sha1, $params['params'], 
$params['keys'] ) );
+                       }
+                       $res = $this->redisCmd( $qConn, 'exec' );
 
                        $now = time();
-
-                       $paramsByQueue = array();
-                       foreach ( $types as $type ) {
-                               $ttl = isset( $this->claimTTLMap[$type] )
-                                       ? $this->claimTTLMap[$type]
-                                       : $this->claimTTLMap['*'];
-                               $attempts = isset( $this->attemptsMap[$type] )
-                                       ? $this->attemptsMap[$type]
-                                       : $this->attemptsMap['*'];
-                               foreach ( $wikiIds as $wikiId ) {
-                                       $paramsByQueue[] = array(
-                                               'queue'  => array( $type, 
$wikiId ),
-                                               'params' => array(
-                                                       
"{$wikiId}:jobqueue:{$type}:z-claimed", # KEYS[1]
-                                                       
"{$wikiId}:jobqueue:{$type}:h-attempts", # KEYS[2]
-                                                       
"{$wikiId}:jobqueue:{$type}:l-unclaimed", # KEYS[3]
-                                                       
"{$wikiId}:jobqueue:{$type}:h-data", # KEYS[4]
-                                                       
"{$wikiId}:jobqueue:{$type}:z-abandoned", # KEYS[5]
-                                                       
"{$wikiId}:jobqueue:{$type}:z-delayed", # KEYS[6]
-                                                       $now - $ttl, # ARGV[1]
-                                                       $now - 7 * 86400, # 
ARGV[2]
-                                                       $attempts, # ARGV[3]
-                                                       $now # ARGV[4]
-                                               ),
-                                               'keys'   => 6 # number of first 
argument(s) that are keys
-                                       );
-                               }
-                       }
-
-                       // Run the script on all job queue partitions...
-                       foreach ( $this->queueSrvs as $qServer ) {
-                               $qConn = $this->getRedisConn( $qServer );
-                               if ( !$qConn ) {
+                       foreach ( $res as $i => $result ) {
+                               if ( !$result ) {
                                        $ok = false;
-                                       continue; // partition down
+                                       continue;
                                }
-
-                               $mapSet = array(); // map of (queue name => 
timestamp)
-                               try {
-                                       $sha1 = $this->redisCmd( $qConn, 
'script', array( 'load', $script ) );
-
-                                       $this->redisCmd( $qConn, 'multi', 
array( Redis::PIPELINE ) );
-                                       foreach ( $paramsByQueue as $params ) {
-                                               $this->redisCmd( $qConn, 
'evalSha',
-                                                       array( $sha1, 
$params['params'], $params['keys'] ) );
-                                       }
-                                       $res = $this->redisCmd( $qConn, 'exec' 
);
-
-                                       $now = time();
-                                       foreach ( $res as $i => $result ) {
-                                               if ( !$result ) {
-                                                       $ok = false;
-                                                       continue;
-                                               }
-                                               list( $qType, $qWiki ) = 
$paramsByQueue[$i]['queue'];
-                                               $tasksRun += array_sum( $result 
);
-                                               list( $released, $abandoned, 
$pruned, $undelayed, $ready ) = $result;
-                                               if ( $released > 0 || 
$undelayed > 0 || $ready > 0 ) {
-                                                       // Update the map in 
the aggregator as needed. This checks
-                                                       // $ready to handle 
lost aggregator updates as well as to
-                                                       // merge after network 
partitions that caused fail-over.
-                                                       
$mapSet[$this->encQueueName( $qType, $qWiki )] = $now;
-                                               }
-                                               $this->incrStats( 
"job-recycle.$qType.$qWiki", $released );
-                                               $this->incrStats( 
"job-abandon.$qType.$qWiki", $abandoned );
-                                               $this->incrStats( 
"job-undelay.$qType.$qWiki", $undelayed );
-                                       }
-                               } catch ( RedisException $e ) {
-                                       $ok = false;
-                                       $this->handleRedisError( $e, $qServer );
+                               list( $qType, $qWiki ) = 
$paramsByQueue[$i]['queue'];
+                               list( $released, $abandoned, $pruned, 
$undelayed, $ready ) = $result;
+                               if ( $released > 0 || $undelayed > 0 || $ready 
> 0 ) {
+                                       // This checks $ready to handle lost 
aggregator updates as well as
+                                       // to merge after network partitions 
that caused aggregator fail-over.
+                                       $mapSet[$this->encQueueName( $qType, 
$qWiki )] = $now;
                                }
-
-                               $this->redisCmd( $aConn, 'hMSet', array( 
$this->getReadyQueueKey(), $mapSet ) );
+                               $jobs += array_sum( $result );
+                               $this->incrStats( "job-recycle.$qType.$qWiki", 
$released );
+                               $this->incrStats( "job-abandon.$qType.$qWiki", 
$abandoned );
+                               $this->incrStats( "job-undelay.$qType.$qWiki", 
$undelayed );
                        }
                } catch ( RedisException $e ) {
                        $ok = false;
-                       $this->handleRedisError( $e, $aServer );
+                       $this->handleRedisError( $e, $qServer );
                }
 
-               if ( !$ok ) {
-                       $this->error( "Failed to do periodic tasks for some 
queues." );
-                       $this->incrStats( "periodictasks.failed.$host", 1 );
-               }
-
-               return $tasksRun;
+               return $ok;
        }
 
        /**

-- 
To view, visit https://gerrit.wikimedia.org/r/174214
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I70aaeddef972d370a8e9d81e60dec7ae0fda928c
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to