jenkins-bot has submitted this change and it was merged.

Change subject: Fix job runner service availability during redis outages
......................................................................


Fix job runner service availability during redis outages

* Also cleaned up the pending map to use a constant for the TTL

Bug: T91835
Change-Id: Ib5e4083190998b296a0d560a4abea915fe70c4c5
---
M redisJobChronService
M redisJobRunnerService
M src/RedisJobService.php
3 files changed, 180 insertions(+), 68 deletions(-)

Approvals:
  Ori.livneh: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/redisJobChronService b/redisJobChronService
index 927264a..1d133aa 100755
--- a/redisJobChronService
+++ b/redisJobChronService
@@ -35,6 +35,10 @@
                        }
                }
 
+
+               // This is randomized to scale the liveliness with the # of 
runners.
+               $lastPeriodicTime =  time() - mt_rand( 0, self::TASK_PERIOD_SEC 
);
+
                $memLast = memory_get_usage();
                $this->incrStats( "start-chron.$host", 1 );
                while ( true ) {
@@ -42,19 +46,72 @@
                                pcntl_signal_dispatch();
                        }
 
-                       // Do periodic queue tasks on all queues every 5 minutes
-                       $count = $this->executeReadyPeriodicTasks();
-                       if ( is_int( $count ) ) {
+                       // Do tasks on a simple interval cycle...
+                       if ( ( time() - $lastPeriodicTime ) > 
self::TASK_PERIOD_SEC ) {
+                               $lastPeriodicTime = time();
+
+                               $count = $this->syncAggregatorRegistry();
+                               $this->notice( "Synced $count aggregator(s) to 
the current one." );
+                               $this->incrStats( "periodictasks.sync.$host", 1 
);
+
+                               $count = $this->executePeriodicTasks();
                                $this->notice( "Updated the state of $count 
job(s) (recycle/undelay/abandon)." );
                                $this->incrStats( "periodictasks.done.$host", 1 
);
                        }
+
                        sleep( 1 );
+
                        // Track memory usage
                        $memCurrent = memory_get_usage();
                        $this->debug( "Memory usage: $memCurrent bytes." );
                        $this->incrStats( "memory.$host", $memCurrent - 
$memLast );
                        $memLast = $memCurrent;
                }
+       }
+
+       /**
+        * Merge wiki and queue type registration data into all aggregators
+        *
+        * This makes sure that the data is not lost on aggregator failure
+        *
+        * @return integer Servers fully updated
+        */
+       private function syncAggregatorRegistry() {
+               $count = count( $this->aggrSrvs );
+
+               try {
+                       $types = $this->redisCmdHA(
+                               $this->aggrSrvs,
+                               'hGetAll',
+                               array( $this->getQueueTypesKey() )
+                       );
+                       if ( $types ) {
+                               $okCount = $this->redisCmdBroadcast(
+                                       $this->aggrSrvs,
+                                       'hMSet',
+                                       array( $this->getQueueTypesKey(), 
$types )
+                               );
+                               $count = min( $count, $okCount );
+                       }
+
+                       $wikis = $this->redisCmdHA(
+                               $this->aggrSrvs,
+                               'sMembers',
+                               array( $this->getWikiSetKey() )
+                       );
+                       if ( $wikis ) {
+                               $okCount = $this->redisCmdBroadcast(
+                                       $this->aggrSrvs,
+                                       'sAdd',
+                                       array( $this->getWikiSetKey(), $wikis )
+                               );
+                               $count = min( $count, $okCount );
+                       }
+               } catch ( RedisExceptionHA $e ) {
+                       $count = 0;
+               }
+
+               return $count;
        }
 
        /**
@@ -66,38 +123,23 @@
         * @note: similar to JobQueueRedis.php periodic tasks method
         * @return int|bool Number of jobs recycled/deleted/undelayed/abandoned 
(false if not run)
         */
-       private function executeReadyPeriodicTasks() {
+       private function executePeriodicTasks() {
                $jobs = 0;
-
-               // Run no more than every 5 minutes.
-               // This is randomized to scale the liveliness with the # of 
runners.
-               static $lastPeriodicTime = null;
-               $lastPeriodicTime = $lastPeriodicTime ?: time() - mt_rand( 0, 
self::TASK_PERIOD_SEC );
-               if ( ( time() - $lastPeriodicTime ) <= self::TASK_PERIOD_SEC ) {
-                       return false;
-               }
-               $lastPeriodicTime = time();
 
                $host = gethostname();
 
-               $aConn = false;
-               // Connect to the first working server on the list
-               foreach ( $this->aggrSrvs as $aServer ) {
-                       $aConn = $this->getRedisConn( $aServer );
-                       if ( $aConn ) {
-                               break;
-                       }
-               }
-
-               if ( !$aConn ) {
-                       $this->incrStats( "periodictasks.failed.$host", 1 );
-                       return $jobs;
-               }
-
                $ok = true;
                try {
-                       $types = $this->redisCmd( $aConn, 'hKeys', array( 
$this->getQueueTypesKey() ) );
-                       $wikiIds = $this->redisCmd( $aConn, 'sMembers', array( 
$this->getWikiSetKey() ) );
+                       $types = $this->redisCmdHA(
+                               $this->aggrSrvs,
+                               'hKeys',
+                               array( $this->getQueueTypesKey() )
+                       );
+                       $wikiIds = $this->redisCmdHA(
+                               $this->aggrSrvs,
+                               'sMembers',
+                               array( $this->getWikiSetKey() )
+                       );
                        if ( !is_array( $types ) || !is_array( $wikiIds ) ) {
                                $this->incrStats( "periodictasks.failed.$host", 
1 );
                                return $jobs;
@@ -145,10 +187,13 @@
                                }
                        }
                        // Update the map in the aggregator as queues become 
ready
-                       $this->redisCmd( $aConn, 'hMSet', array( 
$this->getReadyQueueKey(), $mapSet ) );
-               } catch ( RedisException $e ) {
+                       $this->redisCmdHA(
+                               $this->aggrSrvs,
+                               'hMSet',
+                               array( $this->getReadyQueueKey(), $mapSet )
+                       );
+               } catch ( RedisExceptionHA $e ) {
                        $ok = false;
-                       $this->handleRedisError( $e, $aServer );
                }
 
                if ( !$ok ) {
diff --git a/redisJobRunnerService b/redisJobRunnerService
index 6c59a4f..cdfa77c 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -11,6 +11,8 @@
 require( __DIR__ . '/src/JobRunnerPipeline.php' );
 
 class RedisJobRunnerService extends RedisJobService {
+       const AGGR_CACHE_TTL_SEC = 1;
+
        public function main() {
                $this->notice( "Starting job spawner loop(s)..." );
 
@@ -122,52 +124,60 @@
        }
 
        /**
-        * @return array Map of (job type => wiki => UNIX timestamp)
+        * @return array Cached map of (job type => wiki => UNIX timestamp)
         */
        private function &getReadyQueueMap() {
                static $pendingDBs = array(); // cache
                static $cacheTimestamp = 0; // UNIX timestamp
 
-               if ( $cacheTimestamp < microtime( true ) - 1.0 ) {
-                       $conn = false;
-                       // Connect to the first working server on the list
-                       foreach ( $this->aggrSrvs as $server ) {
-                               $conn = $this->getRedisConn( $server );
-                               if ( $conn ) {
-                                       break;
-                               }
-                       }
-                       if ( $conn ) {
-                               try {
-                                       // Match JobQueueAggregatorRedis.php
-                                       $map = $this->redisCmd( $conn, 
'hGetAll', array( $this->getReadyQueueKey() ) );
-                                       if ( $map === false ) {
-                                               return $pendingDBs;
-                                       }
-                                       unset( $map['_epoch'] );
-                                       $cacheTimestamp = microtime( true );
+               $now = microtime( true );
+               $age = ( $now - $cacheTimestamp );
+               if ( $age <= self::AGGR_CACHE_TTL_SEC ) {
+                       return $pendingDBs; // process cache hit
+               }
 
-                                       $wikiIds = array();
-                                       $pendingDBs = array(); // (type => wiki 
=> timestamp)
-                                       foreach ( $map as $key => $time ) {
-                                               list( $type, $wiki ) = 
$this->dencQueueName( $key );
-                                               $pendingDBs[$type][$wiki] = 
$cacheTimestamp;
-                                               $wikiIds[] = $wiki;
-                                       }
-
-                                       // Keep the list of wiki IDs up-to-date
-                                       if ( count( $wikiIds ) ) {
-                                               $params = array_merge( array( 
$this->getWikiSetKey() ), $wikiIds );
-                                               $this->redisCmd( $conn, 'sAdd', 
$params );
-                                       }
-                               } catch ( RedisException $e ) {
-                                       $this->handleRedisError( $e, $server );
-                               }
+               try {
+                       $latestPendingDBs = $this->loadReadyQueueMap();
+                       if ( $latestPendingDBs === false ) {
+                               return $pendingDBs; // use cache value
                        }
+
+                       $pendingDBs = $latestPendingDBs;
+                       $cacheTimestamp = $now;
+               } catch ( RedisExceptionHA $e ) {
+                       // use stale/empty cache
                }
 
                return $pendingDBs;
        }
+
+       /**
+        * @return array Map of (job type => wiki => UNIX timestamp); false on 
error
+        */
+       private function loadReadyQueueMap() {
+               $pendingByType = false;
+
+               try {
+                       // Match JobQueueAggregatorRedis.php
+                       $map = $this->redisCmdHA(
+                               $this->aggrSrvs,
+                               'hGetAll',
+                               array( $this->getReadyQueueKey() )
+                       );
+                       if ( is_array( $map ) ) {
+                               unset( $map['_epoch'] );
+                               $pendingByType = array();
+                               foreach ( $map as $key => $time ) {
+                                       list( $type, $wiki ) = 
$this->dencQueueName( $key );
+                                       $pendingByType[$type][$wiki] = $time;
+                               }
+                       }
+               } catch ( RedisExceptionHA $e ) {
+                       // use stale/empty cache
+               }
+
+               return $pendingByType;
+       }
 }
 
 error_reporting( E_ALL | E_STRICT );
diff --git a/src/RedisJobService.php b/src/RedisJobService.php
index 0d08cd4..9a0face 100755
--- a/src/RedisJobService.php
+++ b/src/RedisJobService.php
@@ -222,7 +222,7 @@
 
        /**
         * @param string $server
-        * @return Redis|boolean|array
+        * @return Redis|boolean
         */
        public function getRedisConn( $server ) {
                // Check the listing "dead" servers which have had a connection 
errors.
@@ -310,6 +310,61 @@
        }
 
        /**
+        * Execute a command on the current working server in $servers
+        *
+        * @param array $servers Ordered list of servers to attempt
+        * @param string $cmd
+        * @param array $args
+        * @return mixed
+        * @throws RedisExceptionHA
+        */
+       public function redisCmdHA( array $servers, $cmd, array $args = array() 
) {
+               foreach ( $servers as $server ) {
+                       $conn = $this->getRedisConn( $server );
+                       if ( $conn ) {
+                               try {
+                                       return $this->redisCmd( $conn, $cmd, 
$args );
+                               } catch ( RedisException $e ) {
+                                       $this->handleRedisError( $e, $server );
+                               }
+                       }
+               }
+
+               throw new RedisExceptionHA( "Could not excecute command on any 
servers." );
+       }
+
+       /**
+        * Execute a command on all servers in $servers
+        *
+        * @param array $servers List of servers to attempt
+        * @param string $cmd
+        * @param array $args
+        * @return integer Number of servers updated
+        * @throws RedisExceptionHA
+        */
+       public function redisCmdBroadcast( array $servers, $cmd, array $args = 
array() ) {
+               $updated = 0;
+
+               foreach ( $servers as $server ) {
+                       $conn = $this->getRedisConn( $server );
+                       if ( $conn ) {
+                               try {
+                                       $this->redisCmd( $conn, $cmd, $args );
+                                       ++$updated;
+                               } catch ( RedisException $e ) {
+                                       $this->handleRedisError( $e, $server );
+                               }
+                       }
+               }
+
+               if ( !$updated ) {
+                       throw new RedisExceptionHA( "Could not excecute command 
on any servers." );
+               }
+
+               return $updated;
+       }
+
+       /**
         * @param string $event
         * @param integer $delta
         * @return void
@@ -365,3 +420,5 @@
                fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
        }
 }
+
+class RedisExceptionHA extends Exception {}

-- 
To view, visit https://gerrit.wikimedia.org/r/195517
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ib5e4083190998b296a0d560a4abea915fe70c4c5
Gerrit-PatchSet: 3
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to