jenkins-bot has submitted this change and it was merged.
Change subject: Fix job runner service availability during redis outages
......................................................................
Fix job runner service availability during redis outages
* Also cleaned up the pending map to use a constant for the TTL
Bug: T91835
Change-Id: Ib5e4083190998b296a0d560a4abea915fe70c4c5
---
M redisJobChronService
M redisJobRunnerService
M src/RedisJobService.php
3 files changed, 180 insertions(+), 68 deletions(-)
Approvals:
Ori.livneh: Looks good to me, approved
jenkins-bot: Verified
diff --git a/redisJobChronService b/redisJobChronService
index 927264a..1d133aa 100755
--- a/redisJobChronService
+++ b/redisJobChronService
@@ -35,6 +35,10 @@
}
}
+
+ // This is randomized to scale the liveliness with the # of
runners.
+ $lastPeriodicTime = time() - mt_rand( 0, self::TASK_PERIOD_SEC
);
+
$memLast = memory_get_usage();
$this->incrStats( "start-chron.$host", 1 );
while ( true ) {
@@ -42,19 +46,72 @@
pcntl_signal_dispatch();
}
- // Do periodic queue tasks on all queues every 5 minutes
- $count = $this->executeReadyPeriodicTasks();
- if ( is_int( $count ) ) {
+ // Do tasks on a simple interval cycle...
+ if ( ( time() - $lastPeriodicTime ) >
self::TASK_PERIOD_SEC ) {
+ $lastPeriodicTime = time();
+
+ $count = $this->syncAggregatorRegistry();
+ $this->notice( "Synced $count aggregator(s) to
the current one." );
+ $this->incrStats( "periodictasks.sync.$host", 1
);
+
+ $count = $this->executePeriodicTasks();
$this->notice( "Updated the state of $count
job(s) (recycle/undelay/abandon)." );
$this->incrStats( "periodictasks.done.$host", 1
);
}
+
sleep( 1 );
+
// Track memory usage
$memCurrent = memory_get_usage();
$this->debug( "Memory usage: $memCurrent bytes." );
$this->incrStats( "memory.$host", $memCurrent -
$memLast );
$memLast = $memCurrent;
}
+ }
+
+ /**
+ * Merge wiki and queue type registration data into all aggregators
+ *
+ * This makes sure that the data is not lost on aggregator failure
+ *
+ * @return integer Servers fully updated
+ */
+ private function syncAggregatorRegistry() {
+ $count = count( $this->aggrSrvs );
+
+ try {
+ $types = $this->redisCmdHA(
+ $this->aggrSrvs,
+ 'hGetAll',
+ array( $this->getQueueTypesKey() )
+ );
+ if ( $types ) {
+ $okCount = $this->redisCmdBroadcast(
+ $this->aggrSrvs,
+ 'hMSet',
+ array( $this->getQueueTypesKey(),
$types )
+ );
+ $count = min( $count, $okCount );
+ }
+
+ $wikis = $this->redisCmdHA(
+ $this->aggrSrvs,
+ 'sMembers',
+ array( $this->getWikiSetKey() )
+ );
+ if ( $wikis ) {
+ $okCount = $this->redisCmdBroadcast(
+ $this->aggrSrvs,
+ 'sAdd',
+ array( $this->getWikiSetKey(), $wikis )
+ );
+ $count = min( $count, $okCount );
+ }
+ } catch ( RedisExceptionHA $e ) {
+ $count = 0;
+ }
+
+ return $count;
}
/**
@@ -66,38 +123,23 @@
* @note: similar to JobQueueRedis.php periodic tasks method
* @return int|bool Number of jobs recycled/deleted/undelayed/abandoned
(false if not run)
*/
- private function executeReadyPeriodicTasks() {
+ private function executePeriodicTasks() {
$jobs = 0;
-
- // Run no more than every 5 minutes.
- // This is randomized to scale the liveliness with the # of
runners.
- static $lastPeriodicTime = null;
- $lastPeriodicTime = $lastPeriodicTime ?: time() - mt_rand( 0,
self::TASK_PERIOD_SEC );
- if ( ( time() - $lastPeriodicTime ) <= self::TASK_PERIOD_SEC ) {
- return false;
- }
- $lastPeriodicTime = time();
$host = gethostname();
- $aConn = false;
- // Connect to the first working server on the list
- foreach ( $this->aggrSrvs as $aServer ) {
- $aConn = $this->getRedisConn( $aServer );
- if ( $aConn ) {
- break;
- }
- }
-
- if ( !$aConn ) {
- $this->incrStats( "periodictasks.failed.$host", 1 );
- return $jobs;
- }
-
$ok = true;
try {
- $types = $this->redisCmd( $aConn, 'hKeys', array(
$this->getQueueTypesKey() ) );
- $wikiIds = $this->redisCmd( $aConn, 'sMembers', array(
$this->getWikiSetKey() ) );
+ $types = $this->redisCmdHA(
+ $this->aggrSrvs,
+ 'hKeys',
+ array( $this->getQueueTypesKey() )
+ );
+ $wikiIds = $this->redisCmdHA(
+ $this->aggrSrvs,
+ 'sMembers',
+ array( $this->getWikiSetKey() )
+ );
if ( !is_array( $types ) || !is_array( $wikiIds ) ) {
$this->incrStats( "periodictasks.failed.$host",
1 );
return $jobs;
@@ -145,10 +187,13 @@
}
}
// Update the map in the aggregator as queues become
ready
- $this->redisCmd( $aConn, 'hMSet', array(
$this->getReadyQueueKey(), $mapSet ) );
- } catch ( RedisException $e ) {
+ $this->redisCmdHA(
+ $this->aggrSrvs,
+ 'hMSet',
+ array( $this->getReadyQueueKey(), $mapSet )
+ );
+ } catch ( RedisExceptionHA $e ) {
$ok = false;
- $this->handleRedisError( $e, $aServer );
}
if ( !$ok ) {
diff --git a/redisJobRunnerService b/redisJobRunnerService
index 6c59a4f..cdfa77c 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -11,6 +11,8 @@
require( __DIR__ . '/src/JobRunnerPipeline.php' );
class RedisJobRunnerService extends RedisJobService {
+ const AGGR_CACHE_TTL_SEC = 1;
+
public function main() {
$this->notice( "Starting job spawner loop(s)..." );
@@ -122,52 +124,60 @@
}
/**
- * @return array Map of (job type => wiki => UNIX timestamp)
+ * @return array Cached map of (job type => wiki => UNIX timestamp)
*/
private function &getReadyQueueMap() {
static $pendingDBs = array(); // cache
static $cacheTimestamp = 0; // UNIX timestamp
- if ( $cacheTimestamp < microtime( true ) - 1.0 ) {
- $conn = false;
- // Connect to the first working server on the list
- foreach ( $this->aggrSrvs as $server ) {
- $conn = $this->getRedisConn( $server );
- if ( $conn ) {
- break;
- }
- }
- if ( $conn ) {
- try {
- // Match JobQueueAggregatorRedis.php
- $map = $this->redisCmd( $conn,
'hGetAll', array( $this->getReadyQueueKey() ) );
- if ( $map === false ) {
- return $pendingDBs;
- }
- unset( $map['_epoch'] );
- $cacheTimestamp = microtime( true );
+ $now = microtime( true );
+ $age = ( $now - $cacheTimestamp );
+ if ( $age <= self::AGGR_CACHE_TTL_SEC ) {
+ return $pendingDBs; // process cache hit
+ }
- $wikiIds = array();
- $pendingDBs = array(); // (type => wiki
=> timestamp)
- foreach ( $map as $key => $time ) {
- list( $type, $wiki ) =
$this->dencQueueName( $key );
- $pendingDBs[$type][$wiki] =
$cacheTimestamp;
- $wikiIds[] = $wiki;
- }
-
- // Keep the list of wiki IDs up-to-date
- if ( count( $wikiIds ) ) {
- $params = array_merge( array(
$this->getWikiSetKey() ), $wikiIds );
- $this->redisCmd( $conn, 'sAdd',
$params );
- }
- } catch ( RedisException $e ) {
- $this->handleRedisError( $e, $server );
- }
+ try {
+ $latestPendingDBs = $this->loadReadyQueueMap();
+ if ( $latestPendingDBs === false ) {
+ return $pendingDBs; // use cache value
}
+
+ $pendingDBs = $latestPendingDBs;
+ $cacheTimestamp = $now;
+ } catch ( RedisExceptionHA $e ) {
+ // use stale/empty cache
}
return $pendingDBs;
}
+
+ /**
+ * @return array Map of (job type => wiki => UNIX timestamp); false on
error
+ */
+ private function loadReadyQueueMap() {
+ $pendingByType = false;
+
+ try {
+ // Match JobQueueAggregatorRedis.php
+ $map = $this->redisCmdHA(
+ $this->aggrSrvs,
+ 'hGetAll',
+ array( $this->getReadyQueueKey() )
+ );
+ if ( is_array( $map ) ) {
+ unset( $map['_epoch'] );
+ $pendingByType = array();
+ foreach ( $map as $key => $time ) {
+ list( $type, $wiki ) =
$this->dencQueueName( $key );
+ $pendingByType[$type][$wiki] = $time;
+ }
+ }
+ } catch ( RedisExceptionHA $e ) {
+ // use stale/empty cache
+ }
+
+ return $pendingByType;
+ }
}
error_reporting( E_ALL | E_STRICT );
diff --git a/src/RedisJobService.php b/src/RedisJobService.php
index 0d08cd4..9a0face 100755
--- a/src/RedisJobService.php
+++ b/src/RedisJobService.php
@@ -222,7 +222,7 @@
/**
* @param string $server
- * @return Redis|boolean|array
+ * @return Redis|boolean
*/
public function getRedisConn( $server ) {
// Check the listing "dead" servers which have had a connection
errors.
@@ -310,6 +310,61 @@
}
/**
+ * Execute a command on the current working server in $servers
+ *
+ * @param array $servers Ordered list of servers to attempt
+ * @param string $cmd
+ * @param array $args
+ * @return mixed
+ * @throws RedisExceptionHA
+ */
+ public function redisCmdHA( array $servers, $cmd, array $args = array()
) {
+ foreach ( $servers as $server ) {
+ $conn = $this->getRedisConn( $server );
+ if ( $conn ) {
+ try {
+ return $this->redisCmd( $conn, $cmd,
$args );
+ } catch ( RedisException $e ) {
+ $this->handleRedisError( $e, $server );
+ }
+ }
+ }
+
+ throw new RedisExceptionHA( "Could not excecute command on any
servers." );
+ }
+
+ /**
+ * Execute a command on all servers in $servers
+ *
+ * @param array $servers List of servers to attempt
+ * @param string $cmd
+ * @param array $args
+ * @return integer Number of servers updated
+ * @throws RedisExceptionHA
+ */
+ public function redisCmdBroadcast( array $servers, $cmd, array $args =
array() ) {
+ $updated = 0;
+
+ foreach ( $servers as $server ) {
+ $conn = $this->getRedisConn( $server );
+ if ( $conn ) {
+ try {
+ $this->redisCmd( $conn, $cmd, $args );
+ ++$updated;
+ } catch ( RedisException $e ) {
+ $this->handleRedisError( $e, $server );
+ }
+ }
+ }
+
+ if ( !$updated ) {
+ throw new RedisExceptionHA( "Could not excecute command
on any servers." );
+ }
+
+ return $updated;
+ }
+
+ /**
* @param string $event
* @param integer $delta
* @return void
@@ -365,3 +420,5 @@
fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
}
}
+
+class RedisExceptionHA extends Exception {}
--
To view, visit https://gerrit.wikimedia.org/r/195517
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ib5e4083190998b296a0d560a4abea915fe70c4c5
Gerrit-PatchSet: 3
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits