Aaron Schulz has uploaded a new change for review. https://gerrit.wikimedia.org/r/174214
Change subject: Batch executeReadyPeriodicTasks() updates to avoid OOMs ...................................................................... Batch executeReadyPeriodicTasks() updates to avoid OOMs Change-Id: I70aaeddef972d370a8e9d81e60dec7ae0fda928c --- M redisJobRunnerService 1 file changed, 103 insertions(+), 90 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner refs/changes/14/174214/1 diff --git a/redisJobRunnerService b/redisJobRunnerService index 69ec126..3832e27 100755 --- a/redisJobRunnerService +++ b/redisJobRunnerService @@ -375,15 +375,18 @@ * @return int|bool Number of jobs recycled/deleted/undelayed/abandoned (false if not run) */ protected function executeReadyPeriodicTasks() { - $tasksRun = 0; + $jobs = 0; - // Run no more than every 5 minutes + // Run no more than every 5 minutes. + // This is randomized to scale the liveliness with the # of runners. static $lastPeriodicTime = null; $lastPeriodicTime = $lastPeriodicTime ?: time() - mt_rand( 0, 300 ); if ( ( time() - $lastPeriodicTime ) <= 300 ) { return false; } $lastPeriodicTime = time(); + + $host = gethostname(); $aConn = false; // Connect to the first working server on the list @@ -394,11 +397,84 @@ } } - $host = gethostname(); - if ( !$aConn ) { $this->incrStats( "periodictasks.failed.$host", 1 ); - return $tasksRun; + return $jobs; + } + + $ok = true; + try { + $types = $this->redisCmd( $aConn, 'hKeys', array( $this->getQueueTypesKey() ) ); + $wikiIds = $this->redisCmd( $aConn, 'sMembers', array( $this->getWikiSetKey() ) ); + if ( !is_array( $types ) || !is_array( $wikiIds ) ) { + $this->incrStats( "periodictasks.failed.$host", 1 ); + return $jobs; + } + + // Randomize to scale the liveliness with the # of runners + shuffle( $types ); + shuffle( $wikiIds ); + + $now = time(); + + // Build up the script arguments for each queue... + $paramsByQueue = array(); + foreach ( $types as $type ) { + $ttl = isset( $this->claimTTLMap[$type] ) + ? $this->claimTTLMap[$type] + : $this->claimTTLMap['*']; + $attempts = isset( $this->attemptsMap[$type] ) + ? $this->attemptsMap[$type] + : $this->attemptsMap['*']; + foreach ( $wikiIds as $wikiId ) { + $paramsByQueue[] = array( + 'queue' => array( $type, $wikiId ), + 'params' => array( + "{$wikiId}:jobqueue:{$type}:z-claimed", # KEYS[1] + "{$wikiId}:jobqueue:{$type}:h-attempts", # KEYS[2] + "{$wikiId}:jobqueue:{$type}:l-unclaimed", # KEYS[3] + "{$wikiId}:jobqueue:{$type}:h-data", # KEYS[4] + "{$wikiId}:jobqueue:{$type}:z-abandoned", # KEYS[5] + "{$wikiId}:jobqueue:{$type}:z-delayed", # KEYS[6] + $now - $ttl, # ARGV[1] + $now - 7 * 86400, # ARGV[2] + $attempts, # ARGV[3] + $now # ARGV[4] + ), + 'keys' => 6 # number of first argument(s) that are keys + ); + } + } + // Batch the Lua queries to avoid client OOMs + $paramsByQueueBatches = array_chunk( $paramsByQueue, 1000 ); + + $mapSet = array(); // ready map of (queue name => timestamp) + // Run the script on all job queue partitions... + foreach ( $this->queueSrvs as $qServer ) { + foreach ( $paramsByQueueBatches as $paramsByQueueBatch ) { + $ok = $this->updateQueueServer( + $qServer, $paramsByQueueBatch, $mapSet, $jobs ) && $ok; + } + } + // Update the map in the aggregator as queues become ready + $this->redisCmd( $aConn, 'hMSet', array( $this->getReadyQueueKey(), $mapSet ) ); + } catch ( RedisException $e ) { + $ok = false; + $this->handleRedisError( $e, $aServer ); + } + + if ( !$ok ) { + $this->error( "Failed to do periodic tasks for some queues." ); + $this->incrStats( "periodictasks.failed.$host", 1 ); + } + + return $jobs; + } + + private function updateQueueServer( $qServer, array $paramsByQueue, array &$mapSet, &$jobs ) { + $qConn = $this->getRedisConn( $qServer ); + if ( !$qConn ) { + return false; // partition down } static $script = @@ -447,102 +523,39 @@ $ok = true; try { - $types = $this->redisCmd( $aConn, 'hKeys', array( $this->getQueueTypesKey() ) ); - $wikiIds = $this->redisCmd( $aConn, 'sMembers', array( $this->getWikiSetKey() ) ); - if ( !is_array( $types ) || !is_array( $wikiIds ) ) { - $this->incrStats( "periodictasks.failed.$host", 1 ); - return $tasksRun; - } + $sha1 = $this->redisCmd( $qConn, 'script', array( 'load', $script ) ); - shuffle( $types ); - shuffle( $wikiIds ); + $this->redisCmd( $qConn, 'multi', array( Redis::PIPELINE ) ); + foreach ( $paramsByQueue as $params ) { + $this->redisCmd( $qConn, 'evalSha', + array( $sha1, $params['params'], $params['keys'] ) ); + } + $res = $this->redisCmd( $qConn, 'exec' ); $now = time(); - - $paramsByQueue = array(); - foreach ( $types as $type ) { - $ttl = isset( $this->claimTTLMap[$type] ) - ? $this->claimTTLMap[$type] - : $this->claimTTLMap['*']; - $attempts = isset( $this->attemptsMap[$type] ) - ? $this->attemptsMap[$type] - : $this->attemptsMap['*']; - foreach ( $wikiIds as $wikiId ) { - $paramsByQueue[] = array( - 'queue' => array( $type, $wikiId ), - 'params' => array( - "{$wikiId}:jobqueue:{$type}:z-claimed", # KEYS[1] - "{$wikiId}:jobqueue:{$type}:h-attempts", # KEYS[2] - "{$wikiId}:jobqueue:{$type}:l-unclaimed", # KEYS[3] - "{$wikiId}:jobqueue:{$type}:h-data", # KEYS[4] - "{$wikiId}:jobqueue:{$type}:z-abandoned", # KEYS[5] - "{$wikiId}:jobqueue:{$type}:z-delayed", # KEYS[6] - $now - $ttl, # ARGV[1] - $now - 7 * 86400, # ARGV[2] - $attempts, # ARGV[3] - $now # ARGV[4] - ), - 'keys' => 6 # number of first argument(s) that are keys - ); - } - } - - // Run the script on all job queue partitions... - foreach ( $this->queueSrvs as $qServer ) { - $qConn = $this->getRedisConn( $qServer ); - if ( !$qConn ) { + foreach ( $res as $i => $result ) { + if ( !$result ) { $ok = false; - continue; // partition down + continue; } - - $mapSet = array(); // map of (queue name => timestamp) - try { - $sha1 = $this->redisCmd( $qConn, 'script', array( 'load', $script ) ); - - $this->redisCmd( $qConn, 'multi', array( Redis::PIPELINE ) ); - foreach ( $paramsByQueue as $params ) { - $this->redisCmd( $qConn, 'evalSha', - array( $sha1, $params['params'], $params['keys'] ) ); - } - $res = $this->redisCmd( $qConn, 'exec' ); - - $now = time(); - foreach ( $res as $i => $result ) { - if ( !$result ) { - $ok = false; - continue; - } - list( $qType, $qWiki ) = $paramsByQueue[$i]['queue']; - $tasksRun += array_sum( $result ); - list( $released, $abandoned, $pruned, $undelayed, $ready ) = $result; - if ( $released > 0 || $undelayed > 0 || $ready > 0 ) { - // Update the map in the aggregator as needed. This checks - // $ready to handle lost aggregator updates as well as to - // merge after network partitions that caused fail-over. - $mapSet[$this->encQueueName( $qType, $qWiki )] = $now; - } - $this->incrStats( "job-recycle.$qType.$qWiki", $released ); - $this->incrStats( "job-abandon.$qType.$qWiki", $abandoned ); - $this->incrStats( "job-undelay.$qType.$qWiki", $undelayed ); - } - } catch ( RedisException $e ) { - $ok = false; - $this->handleRedisError( $e, $qServer ); + list( $qType, $qWiki ) = $paramsByQueue[$i]['queue']; + list( $released, $abandoned, $pruned, $undelayed, $ready ) = $result; + if ( $released > 0 || $undelayed > 0 || $ready > 0 ) { + // This checks $ready to handle lost aggregator updates as well as + // to merge after network partitions that caused aggregator fail-over. + $mapSet[$this->encQueueName( $qType, $qWiki )] = $now; } - - $this->redisCmd( $aConn, 'hMSet', array( $this->getReadyQueueKey(), $mapSet ) ); + $jobs += array_sum( $result ); + $this->incrStats( "job-recycle.$qType.$qWiki", $released ); + $this->incrStats( "job-abandon.$qType.$qWiki", $abandoned ); + $this->incrStats( "job-undelay.$qType.$qWiki", $undelayed ); } } catch ( RedisException $e ) { $ok = false; - $this->handleRedisError( $e, $aServer ); + $this->handleRedisError( $e, $qServer ); } - if ( !$ok ) { - $this->error( "Failed to do periodic tasks for some queues." ); - $this->incrStats( "periodictasks.failed.$host", 1 ); - } - - return $tasksRun; + return $ok; } /** -- To view, visit https://gerrit.wikimedia.org/r/174214 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I70aaeddef972d370a8e9d81e60dec7ae0fda928c Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/jobrunner Gerrit-Branch: master Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits