Aaron Schulz has uploaded a new change for review.
https://gerrit.wikimedia.org/r/192207
Change subject: [WIP] Split out chron tasks from job spawner as another daemon
......................................................................
[WIP] Split out chron tasks from job spawner as another daemon
Change-Id: Ie4308ae6db61e9d48b1af099b37f25ec3cd36e09
---
M .gitreview
M README
M jobrunner.sample.json
M redisJobRunnerService
A src/redisJobPipeline.php
A src/redisJobService.php
6 files changed, 644 insertions(+), 832 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner
refs/changes/07/192207/1
diff --git a/.gitreview b/.gitreview
old mode 100644
new mode 100755
diff --git a/README b/README
old mode 100644
new mode 100755
diff --git a/jobrunner.sample.json b/jobrunner.sample.json
old mode 100644
new mode 100755
diff --git a/redisJobRunnerService b/redisJobRunnerService
index 35c708a..4106cfa 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -4,12 +4,11 @@
if ( PHP_SAPI !== 'cli' ) {
die( "This is not a valid entry point.\n" );
} elseif ( !class_exists( 'Redis' ) ) {
- die( "The phpredis is not installed; aborting.\n" );
+ die( "The phpredis extension is not installed; aborting.\n" );
}
-if ( !function_exists( 'posix_kill' ) ) {
- function posix_kill() {} // no-op on Windows; procs killed with
proc_terminate()
-}
+require( __DIR__ . '/src/RedisJobService.php' );
+require( __DIR__ . '/src/JobRunnerPipeline' );
error_reporting( E_ALL | E_STRICT );
ini_set( 'display_errors', 1 );
@@ -21,190 +20,9 @@
getopt( '', array( 'config-file::', 'help', 'verbose' ) )
)->main();
-class RedisJobRunnerService {
- /** @var array List of IP:<port> entries */
- protected $queueSrvs = array();
- /** @var array List of IP:<port> entries */
- protected $aggrSrvs = array();
- /** @var string Redis password */
- protected $password;
- /** @var string IP address or hostname */
- protected $statsdHost;
- /** @var integer Port number */
- protected $statsdPort;
-
- /** @var bool */
- protected $verbose;
- /** @var array Map of (job type => integer seconds) */
- protected $claimTTLMap = array();
- /** @var array Map of (job type => integer) */
- protected $attemptsMap = array();
-
- /** @var array Map of (id => (include,exclude,low-priority,count) */
- public $loopMap = array();
- /** @var array Map of (job type => integer) */
- public $maxRealMap = array();
- /** @var array Map of (job type => integer) */
- public $maxMemMap = array();
- /** @var array String command to run jobs and return the status JSON
blob */
- public $dispatcher;
-
- /**
- * How long can low priority jobs be run until some high priority
- * jobs should be checked for and run if they exist.
- * @var integer
- */
- public $hpMaxDelay = 120;
- /**
- * The maxtime parameter for runJobs.php for low priority jobs.
- * The lower this value is, the less jobs of one wiki can hog attention
- * from the jobs on other wikis, though more overhead is incurred.
- * This should be lower than hpmaxdelay.
- * @var integer
- */
- public $lpMaxTime = 60;
- /**
- * How long can high priority jobs be run until some low priority
- * jobs should be checked for and run if they exist.
- * @var integer
- */
- public $lpMaxDelay = 600;
- /**
- * The maxtime parameter for runJobs.php for high priority jobs.
- * The lower this value is, the less jobs of one wiki/type can hog
attention
- * from jobs of another wiki/type, though more overhead is incurred.
- * This should be lower than lpmaxdelay.
- * @var integer
- */
- public $hpMaxTime = 30;
-
- /** @var array Map of (server => Redis object) */
- protected $conns = array();
- /** @var array Map of (server => timestamp) */
- protected $downSrvs = array();
-
- /** @var RedisJobRunnerService */
- protected static $instance;
-
- /**
- * @param array $args
- * @return RedisJobRunnerService
- * @throws Exception
- */
- public static function init( array $args ) {
- if ( self::$instance ) {
- throw new Exception( 'RedisJobRunnerService already
initialized.' );
- }
-
- if ( !isset( $args['config-file'] ) || isset( $args['help'] ) )
{
- die( "Usage: php RedisJobRunnerService.php\n"
- . "--config-file=<path>\n"
- . "--help\n"
- );
- }
-
- $file = $args['config-file'];
- $content = file_get_contents( $file );
- if ( $content === false ) {
- throw new Exception( "Coudn't open configuration file
'{$file}''" );
- }
-
- // Remove comments and load into an array
- $content = trim( preg_replace( '/\/\/.*$/m', '', $content ) );
- $config = json_decode( $content, true );
- if ( !is_array( $config ) ) {
- throw new Exception( "Could not parse JSON file
'{$file}'." );
- }
-
- self::$instance = new self( $config );
- self::$instance->verbose = isset( $args['verbose'] );
-
- return self::$instance;
- }
-
- /**
- * @param array $config
- */
- protected function __construct( array $config ) {
- $this->aggrSrvs = $config['redis']['aggregators'];
- if ( !count( $this->aggrSrvs ) ) {
- throw new Exception( "Empty list for
'redis.aggregators'." );
- }
- $this->queueSrvs = $config['redis']['queues'];
- if ( !count( $this->queueSrvs ) ) {
- throw new Exception( "Empty list for 'redis.queues'." );
- }
- $this->dispatcher = $config['dispatcher'];
- if ( !$this->dispatcher ) {
- throw new Exception( "No command provided for
'dispatcher'." );
- }
-
- foreach ( $config['groups'] as $name => $group ) {
- if ( !is_int( $group['runners'] ) ) {
- throw new Exception( "Invalid 'runners' value
for runner group '$name'." );
- } elseif ( $group['runners'] == 0 ) {
- continue; // loop disabled
- }
-
- foreach ( array( 'include', 'exclude', 'low-priority' )
as $k ) {
- if ( !isset( $group[$k] ) ) {
- $group[$k] = array();
- } elseif ( !is_array( $group[$k] ) ) {
- throw new Exception( "Invalid '$k'
value for runner group '$name'." );
- }
- }
- $this->loopMap[] = $group;
- }
-
- if ( isset( $config['wrapper'] ) ) {
- $this->wrapper = $config['wrapper'];
- }
- if ( isset( $config['redis']['password'] ) ) {
- $this->password = $config['redis']['password'];
- }
-
- $this->claimTTLMap['*'] = 3600;
- if ( isset( $config['limits']['claimTTL'] ) ) {
- $this->claimTTLMap = $config['limits']['claimTTL'] +
$this->claimTTLMap;
- }
-
- $this->attemptsMap['*'] = 3;
- if ( isset( $config['limits']['attempts'] ) ) {
- $this->attemptsMap = $config['limits']['attempts'] +
$this->attemptsMap;
- }
-
- // Avoid killing processes before they get a fair chance to exit
- $this->maxRealMap['*'] = 3600;
- $minRealTime = 2 * max( $this->lpMaxTime, $this->hpMaxTime );
- if ( isset( $config['limits']['real'] ) ) {
- foreach ( $config['limits']['real'] as $type => $value
) {
- $this->maxRealMap[$type] = max( (int)$value,
$minRealTime );
- }
- }
-
- $this->maxMemMap['*'] = '300M';
- if ( isset( $config['limits']['memory'] ) ) {
- $this->maxMemMap = $config['limits']['memory'] +
$this->maxMemMap;
- }
-
- if ( isset( $config['statsd'] ) ) {
- if ( strpos( $config['statsd'], ':' ) !== false ) {
- $parts = explode( ':', $config['statsd'] );
- $this->statsdHost = $parts[0];
- $this->statsdPort = (int)$parts[1];
- } else {
- // Use default statsd port if not specified
- $this->statsdHost = $config['statsd'];
- $this->statsdPort = 8125;
- }
- }
- }
-
- /**
- * Entry point method that starts the service in earnest and keeps
running
- */
+class RedisJobRunnerService extends RedisJobService {
public function main() {
- $this->notice( "Starting job loop(s)..." );
+ $this->notice( "Starting job spawner loop(s)..." );
$host = gethostname();
$prioMap = array(); // map of (id => (current priority, since))
@@ -234,7 +52,7 @@
}
$memLast = memory_get_usage();
- $this->incrStats( "start.$host", 1 );
+ $this->incrStats( "start-runner.$host", 1 );
while ( true ) {
if ( $useSignals ) {
pcntl_signal_dispatch();
@@ -322,7 +140,7 @@
/**
* @return array Map of (job type => wiki => UNIX timestamp)
*/
- protected function &getReadyQueueMap() {
+ private function &getReadyQueueMap() {
static $pendingDBs = array(); // cache
static $cacheTimestamp = 0; // UNIX timestamp
@@ -365,648 +183,5 @@
}
return $pendingDBs;
- }
-
- /**
- * Recycle or destroy any jobs that have been claimed for too long
- * and release any ready delayed jobs into the queue. Also abandon
- * and prune out jobs that failed too many times. This updates the
- * aggregator server as necessary.
- *
- * @note: similar to JobQueueRedis.php periodic tasks method
- * @return int|bool Number of jobs recycled/deleted/undelayed/abandoned
(false if not run)
- */
- protected function executeReadyPeriodicTasks() {
- $jobs = 0;
-
- // Run no more than every 5 minutes.
- // This is randomized to scale the liveliness with the # of
runners.
- static $lastPeriodicTime = null;
- $lastPeriodicTime = $lastPeriodicTime ?: time() - mt_rand( 0,
300 );
- if ( ( time() - $lastPeriodicTime ) <= 300 ) {
- return false;
- }
- $lastPeriodicTime = time();
-
- $host = gethostname();
-
- $aConn = false;
- // Connect to the first working server on the list
- foreach ( $this->aggrSrvs as $aServer ) {
- $aConn = $this->getRedisConn( $aServer );
- if ( $aConn ) {
- break;
- }
- }
-
- if ( !$aConn ) {
- $this->incrStats( "periodictasks.failed.$host", 1 );
- return $jobs;
- }
-
- $ok = true;
- try {
- $types = $this->redisCmd( $aConn, 'hKeys', array(
$this->getQueueTypesKey() ) );
- $wikiIds = $this->redisCmd( $aConn, 'sMembers', array(
$this->getWikiSetKey() ) );
- if ( !is_array( $types ) || !is_array( $wikiIds ) ) {
- $this->incrStats( "periodictasks.failed.$host",
1 );
- return $jobs;
- }
-
- // Randomize to scale the liveliness with the # of
runners
- shuffle( $types );
- shuffle( $wikiIds );
-
- $now = time();
-
- // Build up the script arguments for each queue...
- $paramsByQueue = array();
- foreach ( $types as $type ) {
- $ttl = isset( $this->claimTTLMap[$type] )
- ? $this->claimTTLMap[$type]
- : $this->claimTTLMap['*'];
- $attempts = isset( $this->attemptsMap[$type] )
- ? $this->attemptsMap[$type]
- : $this->attemptsMap['*'];
- foreach ( $wikiIds as $wikiId ) {
- $paramsByQueue[] = array(
- 'queue' => array( $type,
$wikiId ),
- 'params' => array(
-
"{$wikiId}:jobqueue:{$type}:z-claimed", # KEYS[1]
-
"{$wikiId}:jobqueue:{$type}:h-attempts", # KEYS[2]
-
"{$wikiId}:jobqueue:{$type}:l-unclaimed", # KEYS[3]
-
"{$wikiId}:jobqueue:{$type}:h-data", # KEYS[4]
-
"{$wikiId}:jobqueue:{$type}:z-abandoned", # KEYS[5]
-
"{$wikiId}:jobqueue:{$type}:z-delayed", # KEYS[6]
- $now - $ttl, # ARGV[1]
- $now - 7 * 86400, #
ARGV[2]
- $attempts, # ARGV[3]
- $now # ARGV[4]
- ),
- 'keys' => 6 # number of first
argument(s) that are keys
- );
- }
- }
- // Batch the Lua queries to avoid client OOMs
- $paramsByQueueBatches = array_chunk( $paramsByQueue,
1000 );
-
- $mapSet = array(); // ready map of (queue name =>
timestamp)
- // Run the script on all job queue partitions...
- foreach ( $this->queueSrvs as $qServer ) {
- foreach ( $paramsByQueueBatches as
$paramsByQueueBatch ) {
- $ok = $this->updateQueueServer(
- $qServer, $paramsByQueueBatch,
$mapSet, $jobs ) && $ok;
- }
- }
- // Update the map in the aggregator as queues become
ready
- $this->redisCmd( $aConn, 'hMSet', array(
$this->getReadyQueueKey(), $mapSet ) );
- } catch ( RedisException $e ) {
- $ok = false;
- $this->handleRedisError( $e, $aServer );
- }
-
- if ( !$ok ) {
- $this->error( "Failed to do periodic tasks for some
queues." );
- $this->incrStats( "periodictasks.failed.$host", 1 );
- }
-
- return $jobs;
- }
-
- private function updateQueueServer( $qServer, array $paramsByQueue,
array &$mapSet, &$jobs ) {
- $qConn = $this->getRedisConn( $qServer );
- if ( !$qConn ) {
- return false; // partition down
- }
-
- static $script =
-<<<LUA
- local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned,
kDelayed = unpack(KEYS)
- local rClaimCutoff, rPruneCutoff, rAttempts, rTime =
unpack(ARGV)
- local released,abandoned,pruned,undelayed,ready = 0,0,0,0,0
- -- Get all non-dead jobs that have an expired claim on them.
- -- The score for each item is the last claim timestamp (UNIX).
- local staleClaims =
redis.call('zRangeByScore',kClaimed,0,rClaimCutoff)
- for k,id in ipairs(staleClaims) do
- local timestamp = redis.call('zScore',kClaimed,id)
- local attempts = redis.call('hGet',kAttempts,id)
- if attempts < rAttempts then
- -- Claim expired and attempts left: re-enqueue
the job
- redis.call('lPush',kUnclaimed,id)
- released = released + 1
- else
- -- Claim expired and no attempts left: mark the
job as dead
- redis.call('zAdd',kAbandoned,timestamp,id)
- abandoned = abandoned + 1
- end
- redis.call('zRem',kClaimed,id)
- end
- -- Get all of the dead jobs that have been marked as dead for
too long.
- -- The score for each item is the last claim timestamp (UNIX).
- local deadClaims =
redis.call('zRangeByScore',kAbandoned,0,rPruneCutoff)
- for k,id in ipairs(deadClaims) do
- -- Stale and out of attempts: remove any traces of the
job
- redis.call('zRem',kAbandoned,id)
- redis.call('hDel',kAttempts,id)
- redis.call('hDel',kData,id)
- pruned = pruned + 1
- end
- -- Get the list of ready delayed jobs, sorted by readiness
(UNIX timestamp)
- local ids = redis.call('zRangeByScore',kDelayed,0,rTime)
- -- Migrate the jobs from the "delayed" set to the "unclaimed"
list
- for k,id in ipairs(ids) do
- redis.call('lPush',kUnclaimed,id)
- redis.call('zRem',kDelayed,id)
- end
- undelayed = #ids
- ready = redis.call('lLen',kUnclaimed)
- return {released,abandoned,pruned,undelayed,ready}
-LUA;
-
- $ok = true;
- try {
- $sha1 = $this->redisCmd( $qConn, 'script', array(
'load', $script ) );
-
- $this->redisCmd( $qConn, 'multi', array(
Redis::PIPELINE ) );
- foreach ( $paramsByQueue as $params ) {
- $this->redisCmd( $qConn, 'evalSha',
- array( $sha1, $params['params'],
$params['keys'] ) );
- }
- $res = $this->redisCmd( $qConn, 'exec' );
-
- $now = time();
- foreach ( $res as $i => $result ) {
- if ( !$result ) {
- $ok = false;
- continue;
- }
- list( $qType, $qWiki ) =
$paramsByQueue[$i]['queue'];
- list( $released, $abandoned, $pruned,
$undelayed, $ready ) = $result;
- if ( $released > 0 || $undelayed > 0 || $ready
> 0 ) {
- // This checks $ready to handle lost
aggregator updates as well as
- // to merge after network partitions
that caused aggregator fail-over.
- $mapSet[$this->encQueueName( $qType,
$qWiki )] = $now;
- }
- $jobs += ( array_sum( $result ) - $ready );
- $this->incrStats( "job-recycle.$qType.$qWiki",
$released );
- $this->incrStats( "job-abandon.$qType.$qWiki",
$abandoned );
- $this->incrStats( "job-undelay.$qType.$qWiki",
$undelayed );
- }
- } catch ( RedisException $e ) {
- $ok = false;
- $this->handleRedisError( $e, $qServer );
- }
-
- return $ok;
- }
-
- /**
- * @return string (per JobQueueAggregatorRedis.php)
- */
- private function getReadyQueueKey() {
- return "jobqueue:aggregator:h-ready-queues:v2"; // global
- }
-
- /**
- * @return string (per JobQueueAggregatorRedis.php)
- */
- private function getQueueTypesKey() {
- return "jobqueue:aggregator:h-queue-types:v2"; // global
- }
-
- /**
- * @return string
- */
- private function getWikiSetKey() {
- return "jobqueue:aggregator:s-wikis:v2"; // global
- }
-
- /**
- * @param string $type
- * @param string $wiki
- * @return string (per JobQueueAggregatorRedis.php)
- */
- private function encQueueName( $type, $wiki ) {
- return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
- }
-
- /**
- * @param string $name
- * @return string (per JobQueueAggregatorRedis.php)
- */
- private function dencQueueName( $name ) {
- list( $type, $wiki ) = explode( '/', $name, 2 );
-
- return array( rawurldecode( $type ), rawurldecode( $wiki ) );
- }
-
- /**
- * @param string $server
- * @return Redis|boolean|array
- */
- protected function getRedisConn( $server ) {
- // Check the listing "dead" servers which have had a connection
errors.
- // Srvs are marked dead for a limited period of time, to
- // avoid excessive overhead from repeated connection timeouts.
- if ( isset( $this->downSrvs[$server] ) ) {
- $now = time();
- if ( $now > $this->downSrvs[$server] ) {
- // Dead time expired
- unset( $this->downSrvs[$server] );
- } else {
- // Server is dead
- return false;
- }
- }
-
- if ( isset( $this->conns[$server] ) ) {
- return $this->conns[$server];
- }
-
- try {
- $conn = new Redis();
- if ( strpos( $server, ':' ) === false ) {
- $host = $server;
- $port = null;
- } else {
- list( $host, $port ) = explode( ':', $server );
- }
- $result = $conn->connect( $host, $port, 5 );
- if ( !$result ) {
- $this->error( "Could not connect to Redis
server $host:$port." );
- // Mark server down for some time to avoid
further timeouts
- $this->downSrvs[$server] = time() + 30;
-
- return false;
- }
- if ( $this->password !== null ) {
- $conn->auth( $this->password );
- }
- } catch ( RedisException $e ) {
- $this->downSrvs[$server] = time() + 30;
-
- return false;
- }
-
- if ( $conn ) {
- $conn->setOption( Redis::OPT_READ_TIMEOUT, 5 );
- $conn->setOption( Redis::OPT_SERIALIZER,
Redis::SERIALIZER_NONE );
- $this->conns[$server] = $conn;
-
- return $conn;
- } else {
- return false;
- }
- }
-
- /**
- * @param RedisException $e
- * @param string $server
- */
- protected function handleRedisError( RedisException $e, $server ) {
- unset( $this->conns[$server] );
- $this->error( "Redis error: " . $e->getMessage() );
- $this->incrStats( "redis-error." . gethostname(), 1 );
- }
-
- /**
- * @param Redis $conn
- * @param string $cmd
- * @param array $args
- * @return mixed
- */
- protected function redisCmd( Redis $conn, $cmd, array $args = array() )
{
- $conn->clearLastError();
- // we had some job runners oom'ing on this call, log what we are
- // doing so there is relevant information next to the oom
- $this->debug( "Redis cmd: $cmd " . json_encode( $args ) );
- $res = call_user_func_array( array( $conn, $cmd ), $args );
- if ( $conn->getLastError() ) {
- // Make all errors be exceptions instead of "most but
not all".
- // This will let the caller know to reset the
connection to be safe.
- throw new RedisException( $conn->getLastError() );
- }
- return $res;
- }
-
- /**
- * @param string $event
- * @param integer $delta
- * @return void
- */
- public function incrStats( $event, $delta = 1 ) {
- if ( !$this->statsdHost || $delta == 0 ) {
- return; // nothing to do
- }
-
- static $format = "%s:%s|m\n";
- $packet = sprintf( $format, "jobrunner.$event", $delta );
-
- if ( !function_exists( 'socket_create' ) ) {
- $this->debug( 'No "socket_create" method available.' );
- return;
- }
-
- static $socket = null;
- if ( !$socket ) {
- $socket = socket_create( AF_INET, SOCK_DGRAM, SOL_UDP );
- }
-
- socket_sendto(
- $socket,
- $packet,
- strlen( $packet ),
- 0,
- $this->statsdHost,
- $this->statsdPort
- );
- }
-
- /**
- * @param string $s
- */
- public function debug( $s ) {
- if ( $this->verbose ) {
- print date( DATE_ISO8601 ) . ": $s\n";
- }
- }
-
- /**
- * @param string $s
- */
- public function notice( $s ) {
- print date( DATE_ISO8601 ) . ": $s\n";
- }
-
- /**
- * @param string $s
- */
- public function error( $s ) {
- fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
- }
-}
-
-class JobRunnerPipeline {
- /** @var RedisJobRunnerService */
- protected $srvc;
- /** @var array (loop ID => slot ID => slot status array) */
- protected $procMap = array();
-
- /**
- * @param JobRunnerService $service
- */
- public function __construct( RedisJobRunnerService $service ) {
- $this->srvc = $service;
- }
-
- /**
- * @param string $loop
- * @param int $slot
- */
- public function initSlot( $loop, $slot ) {
- $this->procMap[$loop][$slot] = array(
- 'handle' => false,
- 'pipes' => array(),
- 'db' => null,
- 'type' => null,
- 'cmd' => null,
- 'stime' => 0,
- 'sigtime' => 0,
- );
- }
-
- /**
- * @param integer $loop
- * @param array $prioMap
- * @param array $pending
- * @return array
- */
- public function refillSlots( $loop, array $prioMap, array &$pending ) {
- $free = 0;
- $new = 0;
- $host = gethostname();
- $cTime = time();
- foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
- $status = $procSlot['handle']
- ? proc_get_status( $procSlot['handle'] )
- : false;
- if ( $status && $status['running'] ) {
- $maxReal = isset(
$this->srvc->maxRealMap[$procSlot['type']] )
- ?
$this->srvc->maxRealMap[$procSlot['type']]
- : $this->srvc->maxRealMap['*'];
- $age = $cTime - $procSlot['stime'];
- if ( $age >= $maxReal && !$procSlot['sigtime']
) {
- $cmd = $procSlot['cmd'];
- $this->srvc->error( "Runner loop $loop
process in slot $slot timed out " .
- "[{$age}s; max:
{$maxReal}s]:\n$cmd" );
- posix_kill( $status['pid'], SIGTERM );
// non-blocking
- $procSlot['sigtime'] = time();
- $this->srvc->incrStats(
'runner-status.timeout', 1 );
- } elseif ( $age >= $maxReal && ( $cTime -
$procSlot['sigtime'] ) > 5 ) {
- $this->srvc->error( "Runner loop $loop
process in slot $slot sent SIGKILL." );
- $this->closeRunner( $loop, $slot,
$procSlot, SIGKILL );
- $this->srvc->incrStats(
'runner-status.kill', 1 );
- } else {
- continue; // slot is busy
- }
- } elseif ( $status && !$status['running'] ) {
- $response = trim( stream_get_contents(
$procSlot['pipes'][1] ) );
- // $result will be an array if no exceptions
happened
- $result = json_decode( $response, true );
- if ( $status['exitcode'] == 0 && is_array(
$result ) ) {
- // If this finished early, lay off of
the queue for a while
- if ( ( $cTime - $procSlot['stime'] ) <
$this->srvc->hpMaxTime/2 ) {
- unset(
$pending[$procSlot['type']][$procSlot['db']] );
- $this->srvc->debug( "Queue
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
- }
- $ok = 0; // jobs that ran OK
- foreach ( $result['jobs'] as $status ) {
- $ok += ( $status['status'] ===
'ok' ) ? 1 : 0;
- }
- $failed = count( $result['jobs'] ) -
$ok;
- $this->srvc->incrStats(
"pop.{$procSlot['type']}.ok.{$host}", $ok );
- $this->srvc->incrStats(
"pop.{$procSlot['type']}.failed.{$host}", $failed );
- } else {
- // Mention any serious errors that may
have occured
- $cmd = $procSlot['cmd'];
- $error = trim( stream_get_contents(
$procSlot['pipes'][2] ) ) ?: $response;
- if ( strlen( $error ) > 4096 ) { //
truncate long errors
- $error = mb_substr( $error, 0,
4096 ) . '...';
- }
- $this->srvc->error( "Runner loop $loop
process in slot $slot " .
- "gave status
'{$status['exitcode']}':\n$cmd\n\t$error" );
- $this->srvc->incrStats(
'runner-status.error', 1 );
- }
- $this->closeRunner( $loop, $slot, $procSlot );
- } elseif ( !$status && $procSlot['handle'] ) {
- $this->srvc->error( "Runner loop $loop process
in slot $slot gave no status." );
- $this->closeRunner( $loop, $slot, $procSlot );
- $this->srvc->incrStats( 'runner-status.none', 1
);
- }
- ++$free;
- $queue = $this->selectQueue( $loop, $prioMap, $pending
);
- if ( !$queue ) {
- break;
- }
- // Spawn a job runner for this loop ID
- $highPrio = $prioMap[$loop]['high'];
- $this->spawnRunner( $loop, $slot, $highPrio, $queue,
$procSlot );
- ++$new;
- }
- unset( $procSlot );
-
- return array( $free, $new );
- }
-
- /**
- * @param integer $loop
- * @param array $prioMap
- * @param array $pending
- * @return array|boolean
- */
- protected function selectQueue( $loop, array $prioMap, array $pending )
{
- $include = $this->srvc->loopMap[$loop]['include'];
- $exclude = $this->srvc->loopMap[$loop]['exclude'];
- if ( $prioMap[$loop]['high'] ) {
- $exclude = array_merge( $exclude,
$this->srvc->loopMap[$loop]['low-priority'] );
- } else {
- $include = array_merge( $include,
$this->srvc->loopMap[$loop]['low-priority'] );
- }
- if ( in_array( '*', $include ) ) {
- $include = array_merge( $include, array_keys( $pending
) );
- }
-
- $candidateTypes = array_diff( array_unique( $include ),
$exclude, array( '*' ) );
-
- $candidates = array(); // list of (type, db)
- // Flatten the tree of candidates into a flat list so that a
random
- // item can be selected, weighing each queue (type/db tuple)
equally.
- foreach ( $candidateTypes as $type ) {
- if ( isset( $pending[$type] ) ) {
- foreach ( $pending[$type] as $db => $since ) {
- $candidates[] = array( $type, $db );
- }
- }
- }
-
- if ( !count( $candidates ) ) {
- return false; // no jobs for this type
- }
-
- return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
- }
-
- /**
- * @param integer $loop
- * @param integer $slot
- * @param bool $highPrio
- * @param array $queue
- * @param array $procSlot
- * @return bool
- */
- protected function spawnRunner( $loop, $slot, $highPrio, array $queue,
array &$procSlot ) {
- // Pick a random queue
- list( $type, $db ) = $queue;
- $maxtime = $highPrio ? $this->srvc->lpMaxTime :
$this->srvc->hpMaxTime;
- $maxmem = isset( $this->srvc->maxMemMap[$type] )
- ? $this->srvc->maxMemMap[$type]
- : $this->srvc->maxMemMap['*'];
-
- // Make sure the runner is launched with various time/memory
limits.
- // Nice the process so things like ssh and deployment scripts
are fine.
- $what = $with = array();
- foreach ( compact( 'db', 'type', 'maxtime', 'maxmem' ) as $k =>
$v ) {
- $what[] = "%($k)u";
- $with[] = rawurlencode( $v );
- $what[] = "%($k)x";
- $with[] = escapeshellarg( $v );
- }
- // The dispatcher might be runJobs.php, curl, or wget
- $cmd = str_replace( $what, $with, $this->srvc->dispatcher );
-
- $descriptors = array(
- 0 => array( "pipe", "r" ), // stdin (child)
- 1 => array( "pipe", "w" ), // stdout (child)
- 2 => array( "pipe", "w" ) // stderr (child)
- );
-
- $this->srvc->debug( "Spawning runner in loop $loop at slot
$slot ($type, $db):\n\t$cmd." );
-
- // Start the runner in the background
- $procSlot['handle'] = proc_open( $cmd, $descriptors,
$procSlot['pipes'] );
- if ( $procSlot['handle'] ) {
- // Set a timeout so stream_get_contents() won't block
for sanity
- stream_set_timeout( $procSlot['pipes'][1], 5 );
- stream_set_timeout( $procSlot['pipes'][2], 5 );
- // Close the unused STDIN pipe
- fclose( $procSlot['pipes'][0] );
- unset( $procSlot['pipes'][0] ); // unused
- }
-
- $procSlot['db'] = $db;
- $procSlot['type'] = $type;
- $procSlot['cmd'] = $cmd;
- $procSlot['stime'] = time();
- $procSlot['sigtime'] = 0;
-
- if ( $procSlot['handle'] ) {
- return true;
- } else {
- $this->srvc->error( "Could not spawn process in loop
$loop: $cmd" );
- $this->srvc->incrStats( 'runner-status.error', 1 );
- return false;
- }
- }
-
- /**
- * @param integer $loop
- * @param integer $slot
- * @param array $procSlot
- * @param integer $signal
- */
- protected function closeRunner( $loop, $slot, array &$procSlot, $signal
= null ) {
- if ( $procSlot['pipes'] ) {
- if ( $procSlot['pipes'][1] !== false ) {
- fclose( $procSlot['pipes'][1] );
- $procSlot['pipes'][1] = false;
- }
- if ( $procSlot['pipes'][2] !== false ) {
- fclose( $procSlot['pipes'][2] );
- $procSlot['pipes'][2] = false;
- }
- }
- if ( $procSlot['handle'] ) {
- $this->srvc->debug( "Closing process in loop $loop at
slot $slot." );
- if ( $signal !== null ) {
- // Tell the process to close with a signal
- proc_terminate( $procSlot['handle'], $signal );
- } else {
- // Wait for the process to finish on its own
- proc_close( $procSlot['handle'] );
- }
- }
- $procSlot['handle'] = false;
- $procSlot['db'] = null;
- $procSlot['type'] = null;
- $procSlot['stime'] = 0;
- $procSlot['sigtime'] = 0;
- $procSlot['cmd'] = null;
- }
-
- public function terminateSlots() {
- foreach ( $this->procMap as &$procSlots ) {
- foreach ( $procSlots as &$procSlot ) {
- if ( !$procSlot['handle'] ) {
- continue;
- }
- fclose( $procSlot['pipes'][1] );
- fclose( $procSlot['pipes'][2] );
- $status = proc_get_status( $procSlot['handle']
);
- print "Sending SIGTERM to {$status['pid']}.\n";
- proc_terminate( $procSlot['handle'] );
- }
- unset( $procSlot );
- }
- unset( $procSlots );
}
}
diff --git a/src/redisJobPipeline.php b/src/redisJobPipeline.php
new file mode 100755
index 0000000..310ca5d
--- /dev/null
+++ b/src/redisJobPipeline.php
@@ -0,0 +1,270 @@
+<?php
+
+if ( !function_exists( 'posix_kill' ) ) {
+ function posix_kill() {} // no-op on Windows; procs killed with
proc_terminate()
+}
+
+class JobRunnerPipeline {
+ /** @var RedisJobService */
+ protected $srvc;
+ /** @var array (loop ID => slot ID => slot status array) */
+ protected $procMap = array();
+
+ /**
+ * @param JobRunnerService $service
+ */
+ public function __construct( RedisJobService $service ) {
+ $this->srvc = $service;
+ }
+
+ /**
+ * @param string $loop
+ * @param int $slot
+ */
+ public function initSlot( $loop, $slot ) {
+ $this->procMap[$loop][$slot] = array(
+ 'handle' => false,
+ 'pipes' => array(),
+ 'db' => null,
+ 'type' => null,
+ 'cmd' => null,
+ 'stime' => 0,
+ 'sigtime' => 0,
+ );
+ }
+
+ /**
+ * @param integer $loop
+ * @param array $prioMap
+ * @param array $pending
+ * @return array
+ */
+ public function refillSlots( $loop, array $prioMap, array &$pending ) {
+ $free = 0;
+ $new = 0;
+ $host = gethostname();
+ $cTime = time();
+ foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
+ $status = $procSlot['handle']
+ ? proc_get_status( $procSlot['handle'] )
+ : false;
+ if ( $status && $status['running'] ) {
+ $maxReal = isset(
$this->srvc->maxRealMap[$procSlot['type']] )
+ ?
$this->srvc->maxRealMap[$procSlot['type']]
+ : $this->srvc->maxRealMap['*'];
+ $age = $cTime - $procSlot['stime'];
+ if ( $age >= $maxReal && !$procSlot['sigtime']
) {
+ $cmd = $procSlot['cmd'];
+ $this->srvc->error( "Runner loop $loop
process in slot $slot timed out " .
+ "[{$age}s; max:
{$maxReal}s]:\n$cmd" );
+ posix_kill( $status['pid'], SIGTERM );
// non-blocking
+ $procSlot['sigtime'] = time();
+ $this->srvc->incrStats(
'runner-status.timeout', 1 );
+ } elseif ( $age >= $maxReal && ( $cTime -
$procSlot['sigtime'] ) > 5 ) {
+ $this->srvc->error( "Runner loop $loop
process in slot $slot sent SIGKILL." );
+ $this->closeRunner( $loop, $slot,
$procSlot, SIGKILL );
+ $this->srvc->incrStats(
'runner-status.kill', 1 );
+ } else {
+ continue; // slot is busy
+ }
+ } elseif ( $status && !$status['running'] ) {
+ $response = trim( stream_get_contents(
$procSlot['pipes'][1] ) );
+ // $result will be an array if no exceptions
happened
+ $result = json_decode( $response, true );
+ if ( $status['exitcode'] == 0 && is_array(
$result ) ) {
+ // If this finished early, lay off of
the queue for a while
+ if ( ( $cTime - $procSlot['stime'] ) <
$this->srvc->hpMaxTime/2 ) {
+ unset(
$pending[$procSlot['type']][$procSlot['db']] );
+ $this->srvc->debug( "Queue
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
+ }
+ $ok = 0; // jobs that ran OK
+ foreach ( $result['jobs'] as $status ) {
+ $ok += ( $status['status'] ===
'ok' ) ? 1 : 0;
+ }
+ $failed = count( $result['jobs'] ) -
$ok;
+ $this->srvc->incrStats(
"pop.{$procSlot['type']}.ok.{$host}", $ok );
+ $this->srvc->incrStats(
"pop.{$procSlot['type']}.failed.{$host}", $failed );
+ } else {
+ // Mention any serious errors that may
have occured
+ $cmd = $procSlot['cmd'];
+ $error = trim( stream_get_contents(
$procSlot['pipes'][2] ) ) ?: $response;
+ if ( strlen( $error ) > 4096 ) { //
truncate long errors
+ $error = mb_substr( $error, 0,
4096 ) . '...';
+ }
+ $this->srvc->error( "Runner loop $loop
process in slot $slot " .
+ "gave status
'{$status['exitcode']}':\n$cmd\n\t$error" );
+ $this->srvc->incrStats(
'runner-status.error', 1 );
+ }
+ $this->closeRunner( $loop, $slot, $procSlot );
+ } elseif ( !$status && $procSlot['handle'] ) {
+ $this->srvc->error( "Runner loop $loop process
in slot $slot gave no status." );
+ $this->closeRunner( $loop, $slot, $procSlot );
+ $this->srvc->incrStats( 'runner-status.none', 1
);
+ }
+ ++$free;
+ $queue = $this->selectQueue( $loop, $prioMap, $pending
);
+ if ( !$queue ) {
+ break;
+ }
+ // Spawn a job runner for this loop ID
+ $highPrio = $prioMap[$loop]['high'];
+ $this->spawnRunner( $loop, $slot, $highPrio, $queue,
$procSlot );
+ ++$new;
+ }
+ unset( $procSlot );
+
+ return array( $free, $new );
+ }
+
+ /**
+ * @param integer $loop
+ * @param array $prioMap
+ * @param array $pending
+ * @return array|boolean
+ */
+ protected function selectQueue( $loop, array $prioMap, array $pending )
{
+ $include = $this->srvc->loopMap[$loop]['include'];
+ $exclude = $this->srvc->loopMap[$loop]['exclude'];
+ if ( $prioMap[$loop]['high'] ) {
+ $exclude = array_merge( $exclude,
$this->srvc->loopMap[$loop]['low-priority'] );
+ } else {
+ $include = array_merge( $include,
$this->srvc->loopMap[$loop]['low-priority'] );
+ }
+ if ( in_array( '*', $include ) ) {
+ $include = array_merge( $include, array_keys( $pending
) );
+ }
+
+ $candidateTypes = array_diff( array_unique( $include ),
$exclude, array( '*' ) );
+
+ $candidates = array(); // list of (type, db)
+ // Flatten the tree of candidates into a flat list so that a
random
+ // item can be selected, weighing each queue (type/db tuple)
equally.
+ foreach ( $candidateTypes as $type ) {
+ if ( isset( $pending[$type] ) ) {
+ foreach ( $pending[$type] as $db => $since ) {
+ $candidates[] = array( $type, $db );
+ }
+ }
+ }
+
+ if ( !count( $candidates ) ) {
+ return false; // no jobs for this type
+ }
+
+ return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
+ }
+
+ /**
+ * @param integer $loop
+ * @param integer $slot
+ * @param bool $highPrio
+ * @param array $queue
+ * @param array $procSlot
+ * @return bool
+ */
+ protected function spawnRunner( $loop, $slot, $highPrio, array $queue,
array &$procSlot ) {
+ // Pick a random queue
+ list( $type, $db ) = $queue;
+ $maxtime = $highPrio ? $this->srvc->lpMaxTime :
$this->srvc->hpMaxTime;
+ $maxmem = isset( $this->srvc->maxMemMap[$type] )
+ ? $this->srvc->maxMemMap[$type]
+ : $this->srvc->maxMemMap['*'];
+
+ // Make sure the runner is launched with various time/memory
limits.
+ // Nice the process so things like ssh and deployment scripts
are fine.
+ $what = $with = array();
+ foreach ( compact( 'db', 'type', 'maxtime', 'maxmem' ) as $k =>
$v ) {
+ $what[] = "%($k)u";
+ $with[] = rawurlencode( $v );
+ $what[] = "%($k)x";
+ $with[] = escapeshellarg( $v );
+ }
+ // The dispatcher might be runJobs.php, curl, or wget
+ $cmd = str_replace( $what, $with, $this->srvc->dispatcher );
+
+ $descriptors = array(
+ 0 => array( "pipe", "r" ), // stdin (child)
+ 1 => array( "pipe", "w" ), // stdout (child)
+ 2 => array( "pipe", "w" ) // stderr (child)
+ );
+
+ $this->srvc->debug( "Spawning runner in loop $loop at slot
$slot ($type, $db):\n\t$cmd." );
+
+ // Start the runner in the background
+ $procSlot['handle'] = proc_open( $cmd, $descriptors,
$procSlot['pipes'] );
+ if ( $procSlot['handle'] ) {
+ // Set a timeout so stream_get_contents() won't block
for sanity
+ stream_set_timeout( $procSlot['pipes'][1], 5 );
+ stream_set_timeout( $procSlot['pipes'][2], 5 );
+ // Close the unused STDIN pipe
+ fclose( $procSlot['pipes'][0] );
+ unset( $procSlot['pipes'][0] ); // unused
+ }
+
+ $procSlot['db'] = $db;
+ $procSlot['type'] = $type;
+ $procSlot['cmd'] = $cmd;
+ $procSlot['stime'] = time();
+ $procSlot['sigtime'] = 0;
+
+ if ( $procSlot['handle'] ) {
+ return true;
+ } else {
+ $this->srvc->error( "Could not spawn process in loop
$loop: $cmd" );
+ $this->srvc->incrStats( 'runner-status.error', 1 );
+ return false;
+ }
+ }
+
+ /**
+ * @param integer $loop
+ * @param integer $slot
+ * @param array $procSlot
+ * @param integer $signal
+ */
+ protected function closeRunner( $loop, $slot, array &$procSlot, $signal
= null ) {
+ if ( $procSlot['pipes'] ) {
+ if ( $procSlot['pipes'][1] !== false ) {
+ fclose( $procSlot['pipes'][1] );
+ $procSlot['pipes'][1] = false;
+ }
+ if ( $procSlot['pipes'][2] !== false ) {
+ fclose( $procSlot['pipes'][2] );
+ $procSlot['pipes'][2] = false;
+ }
+ }
+ if ( $procSlot['handle'] ) {
+ $this->srvc->debug( "Closing process in loop $loop at
slot $slot." );
+ if ( $signal !== null ) {
+ // Tell the process to close with a signal
+ proc_terminate( $procSlot['handle'], $signal );
+ } else {
+ // Wait for the process to finish on its own
+ proc_close( $procSlot['handle'] );
+ }
+ }
+ $procSlot['handle'] = false;
+ $procSlot['db'] = null;
+ $procSlot['type'] = null;
+ $procSlot['stime'] = 0;
+ $procSlot['sigtime'] = 0;
+ $procSlot['cmd'] = null;
+ }
+
+ public function terminateSlots() {
+ foreach ( $this->procMap as &$procSlots ) {
+ foreach ( $procSlots as &$procSlot ) {
+ if ( !$procSlot['handle'] ) {
+ continue;
+ }
+ fclose( $procSlot['pipes'][1] );
+ fclose( $procSlot['pipes'][2] );
+ $status = proc_get_status( $procSlot['handle']
);
+ print "Sending SIGTERM to {$status['pid']}.\n";
+ proc_terminate( $procSlot['handle'] );
+ }
+ unset( $procSlot );
+ }
+ unset( $procSlots );
+ }
+}
diff --git a/src/redisJobService.php b/src/redisJobService.php
new file mode 100755
index 0000000..0d08cd4
--- /dev/null
+++ b/src/redisJobService.php
@@ -0,0 +1,367 @@
+<?php
+
+/**
+ * Base class for job services with main() implemented by subclasses
+ */
+abstract class RedisJobService {
+ /** @var array List of IP:<port> entries */
+ protected $queueSrvs = array();
+ /** @var array List of IP:<port> entries */
+ protected $aggrSrvs = array();
+ /** @var string Redis password */
+ protected $password;
+ /** @var string IP address or hostname */
+ protected $statsdHost;
+ /** @var integer Port number */
+ protected $statsdPort;
+
+ /** @var bool */
+ protected $verbose;
+ /** @var array Map of (job type => integer seconds) */
+ protected $claimTTLMap = array();
+ /** @var array Map of (job type => integer) */
+ protected $attemptsMap = array();
+
+ /** @var array Map of (id => (include,exclude,low-priority,count) */
+ public $loopMap = array();
+ /** @var array Map of (job type => integer) */
+ public $maxRealMap = array();
+ /** @var array Map of (job type => integer) */
+ public $maxMemMap = array();
+ /** @var array String command to run jobs and return the status JSON
blob */
+ public $dispatcher;
+
+ /**
+ * How long can low priority jobs be run until some high priority
+ * jobs should be checked for and run if they exist.
+ * @var integer
+ */
+ public $hpMaxDelay = 120;
+ /**
+ * The maxtime parameter for runJobs.php for low priority jobs.
+ * The lower this value is, the less jobs of one wiki can hog attention
+ * from the jobs on other wikis, though more overhead is incurred.
+ * This should be lower than hpmaxdelay.
+ * @var integer
+ */
+ public $lpMaxTime = 60;
+ /**
+ * How long can high priority jobs be run until some low priority
+ * jobs should be checked for and run if they exist.
+ * @var integer
+ */
+ public $lpMaxDelay = 600;
+ /**
+ * The maxtime parameter for runJobs.php for high priority jobs.
+ * The lower this value is, the less jobs of one wiki/type can hog
attention
+ * from jobs of another wiki/type, though more overhead is incurred.
+ * This should be lower than lpmaxdelay.
+ * @var integer
+ */
+ public $hpMaxTime = 30;
+
+ /** @var array Map of (server => Redis object) */
+ protected $conns = array();
+ /** @var array Map of (server => timestamp) */
+ protected $downSrvs = array();
+
+ /**
+ * @param array $args
+ * @return RedisJobRunnerService
+ * @throws Exception
+ */
+ public static function init( array $args ) {
+ if ( !isset( $args['config-file'] ) || isset( $args['help'] ) )
{
+ die( "Usage: php RedisJobRunnerService.php\n"
+ . "--config-file=<path>\n"
+ . "--help\n"
+ );
+ }
+
+ $file = $args['config-file'];
+ $content = file_get_contents( $file );
+ if ( $content === false ) {
+ throw new Exception( "Coudn't open configuration file
'{$file}''" );
+ }
+
+ // Remove comments and load into an array
+ $content = trim( preg_replace( '/\/\/.*$/m', '', $content ) );
+ $config = json_decode( $content, true );
+ if ( !is_array( $config ) ) {
+ throw new Exception( "Could not parse JSON file
'{$file}'." );
+ }
+
+ $instance = new static( $config );
+ $instance->verbose = isset( $args['verbose'] );
+
+ return $instance;
+ }
+
+ /**
+ * @param array $config
+ */
+ protected function __construct( array $config ) {
+ $this->aggrSrvs = $config['redis']['aggregators'];
+ if ( !count( $this->aggrSrvs ) ) {
+ throw new Exception( "Empty list for
'redis.aggregators'." );
+ }
+ $this->queueSrvs = $config['redis']['queues'];
+ if ( !count( $this->queueSrvs ) ) {
+ throw new Exception( "Empty list for 'redis.queues'." );
+ }
+ $this->dispatcher = $config['dispatcher'];
+ if ( !$this->dispatcher ) {
+ throw new Exception( "No command provided for
'dispatcher'." );
+ }
+
+ foreach ( $config['groups'] as $name => $group ) {
+ if ( !is_int( $group['runners'] ) ) {
+ throw new Exception( "Invalid 'runners' value
for runner group '$name'." );
+ } elseif ( $group['runners'] == 0 ) {
+ continue; // loop disabled
+ }
+
+ foreach ( array( 'include', 'exclude', 'low-priority' )
as $k ) {
+ if ( !isset( $group[$k] ) ) {
+ $group[$k] = array();
+ } elseif ( !is_array( $group[$k] ) ) {
+ throw new Exception( "Invalid '$k'
value for runner group '$name'." );
+ }
+ }
+ $this->loopMap[] = $group;
+ }
+
+ if ( isset( $config['wrapper'] ) ) {
+ $this->wrapper = $config['wrapper'];
+ }
+ if ( isset( $config['redis']['password'] ) ) {
+ $this->password = $config['redis']['password'];
+ }
+
+ $this->claimTTLMap['*'] = 3600;
+ if ( isset( $config['limits']['claimTTL'] ) ) {
+ $this->claimTTLMap = $config['limits']['claimTTL'] +
$this->claimTTLMap;
+ }
+
+ $this->attemptsMap['*'] = 3;
+ if ( isset( $config['limits']['attempts'] ) ) {
+ $this->attemptsMap = $config['limits']['attempts'] +
$this->attemptsMap;
+ }
+
+ // Avoid killing processes before they get a fair chance to exit
+ $this->maxRealMap['*'] = 3600;
+ $minRealTime = 2 * max( $this->lpMaxTime, $this->hpMaxTime );
+ if ( isset( $config['limits']['real'] ) ) {
+ foreach ( $config['limits']['real'] as $type => $value
) {
+ $this->maxRealMap[$type] = max( (int)$value,
$minRealTime );
+ }
+ }
+
+ $this->maxMemMap['*'] = '300M';
+ if ( isset( $config['limits']['memory'] ) ) {
+ $this->maxMemMap = $config['limits']['memory'] +
$this->maxMemMap;
+ }
+
+ if ( isset( $config['statsd'] ) ) {
+ if ( strpos( $config['statsd'], ':' ) !== false ) {
+ $parts = explode( ':', $config['statsd'] );
+ $this->statsdHost = $parts[0];
+ $this->statsdPort = (int)$parts[1];
+ } else {
+ // Use default statsd port if not specified
+ $this->statsdHost = $config['statsd'];
+ $this->statsdPort = 8125;
+ }
+ }
+ }
+
+ /**
+ * Entry point method that starts the service in earnest and keeps
running
+ */
+ abstract public function main();
+
+ /**
+ * @return string (per JobQueueAggregatorRedis.php)
+ */
+ public function getReadyQueueKey() {
+ return "jobqueue:aggregator:h-ready-queues:v2"; // global
+ }
+
+ /**
+ * @return string (per JobQueueAggregatorRedis.php)
+ */
+ public function getQueueTypesKey() {
+ return "jobqueue:aggregator:h-queue-types:v2"; // global
+ }
+
+ /**
+ * @return string
+ */
+ public function getWikiSetKey() {
+ return "jobqueue:aggregator:s-wikis:v2"; // global
+ }
+
+ /**
+ * @param string $type
+ * @param string $wiki
+ * @return string (per JobQueueAggregatorRedis.php)
+ */
+ public function encQueueName( $type, $wiki ) {
+ return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
+ }
+
+ /**
+ * @param string $name
+ * @return string (per JobQueueAggregatorRedis.php)
+ */
+ public function dencQueueName( $name ) {
+ list( $type, $wiki ) = explode( '/', $name, 2 );
+
+ return array( rawurldecode( $type ), rawurldecode( $wiki ) );
+ }
+
+ /**
+ * @param string $server
+ * @return Redis|boolean|array
+ */
+ public function getRedisConn( $server ) {
+ // Check the listing "dead" servers which have had a connection
errors.
+ // Srvs are marked dead for a limited period of time, to
+ // avoid excessive overhead from repeated connection timeouts.
+ if ( isset( $this->downSrvs[$server] ) ) {
+ $now = time();
+ if ( $now > $this->downSrvs[$server] ) {
+ // Dead time expired
+ unset( $this->downSrvs[$server] );
+ } else {
+ // Server is dead
+ return false;
+ }
+ }
+
+ if ( isset( $this->conns[$server] ) ) {
+ return $this->conns[$server];
+ }
+
+ try {
+ $conn = new Redis();
+ if ( strpos( $server, ':' ) === false ) {
+ $host = $server;
+ $port = null;
+ } else {
+ list( $host, $port ) = explode( ':', $server );
+ }
+ $result = $conn->connect( $host, $port, 5 );
+ if ( !$result ) {
+ $this->error( "Could not connect to Redis
server $host:$port." );
+ // Mark server down for some time to avoid
further timeouts
+ $this->downSrvs[$server] = time() + 30;
+
+ return false;
+ }
+ if ( $this->password !== null ) {
+ $conn->auth( $this->password );
+ }
+ } catch ( RedisException $e ) {
+ $this->downSrvs[$server] = time() + 30;
+
+ return false;
+ }
+
+ if ( $conn ) {
+ $conn->setOption( Redis::OPT_READ_TIMEOUT, 5 );
+ $conn->setOption( Redis::OPT_SERIALIZER,
Redis::SERIALIZER_NONE );
+ $this->conns[$server] = $conn;
+
+ return $conn;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * @param RedisException $e
+ * @param string $server
+ */
+ public function handleRedisError( RedisException $e, $server ) {
+ unset( $this->conns[$server] );
+ $this->error( "Redis error: " . $e->getMessage() );
+ $this->incrStats( "redis-error." . gethostname(), 1 );
+ }
+
+ /**
+ * @param Redis $conn
+ * @param string $cmd
+ * @param array $args
+ * @return mixed
+ */
+ public function redisCmd( Redis $conn, $cmd, array $args = array() ) {
+ $conn->clearLastError();
+ // we had some job runners oom'ing on this call, log what we are
+ // doing so there is relevant information next to the oom
+ $this->debug( "Redis cmd: $cmd " . json_encode( $args ) );
+ $res = call_user_func_array( array( $conn, $cmd ), $args );
+ if ( $conn->getLastError() ) {
+ // Make all errors be exceptions instead of "most but
not all".
+ // This will let the caller know to reset the
connection to be safe.
+ throw new RedisException( $conn->getLastError() );
+ }
+ return $res;
+ }
+
+ /**
+ * @param string $event
+ * @param integer $delta
+ * @return void
+ */
+ public function incrStats( $event, $delta = 1 ) {
+ if ( !$this->statsdHost || $delta == 0 ) {
+ return; // nothing to do
+ }
+
+ static $format = "%s:%s|m\n";
+ $packet = sprintf( $format, "jobrunner.$event", $delta );
+
+ if ( !function_exists( 'socket_create' ) ) {
+ $this->debug( 'No "socket_create" method available.' );
+ return;
+ }
+
+ static $socket = null;
+ if ( !$socket ) {
+ $socket = socket_create( AF_INET, SOCK_DGRAM, SOL_UDP );
+ }
+
+ socket_sendto(
+ $socket,
+ $packet,
+ strlen( $packet ),
+ 0,
+ $this->statsdHost,
+ $this->statsdPort
+ );
+ }
+
+ /**
+ * @param string $s
+ */
+ public function debug( $s ) {
+ if ( $this->verbose ) {
+ print date( DATE_ISO8601 ) . ": $s\n";
+ }
+ }
+
+ /**
+ * @param string $s
+ */
+ public function notice( $s ) {
+ print date( DATE_ISO8601 ) . ": $s\n";
+ }
+
+ /**
+ * @param string $s
+ */
+ public function error( $s ) {
+ fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
+ }
+}
--
To view, visit https://gerrit.wikimedia.org/r/192207
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie4308ae6db61e9d48b1af099b37f25ec3cd36e09
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits