Aaron Schulz has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/192207

Change subject: [WIP] Split out chron tasks from job spawner as another daemon
......................................................................

[WIP] Split out chron tasks from job spawner as another daemon

Change-Id: Ie4308ae6db61e9d48b1af099b37f25ec3cd36e09
---
M .gitreview
M README
M jobrunner.sample.json
M redisJobRunnerService
A src/redisJobPipeline.php
A src/redisJobService.php
6 files changed, 644 insertions(+), 832 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner 
refs/changes/07/192207/1

diff --git a/.gitreview b/.gitreview
old mode 100644
new mode 100755
diff --git a/README b/README
old mode 100644
new mode 100755
diff --git a/jobrunner.sample.json b/jobrunner.sample.json
old mode 100644
new mode 100755
diff --git a/redisJobRunnerService b/redisJobRunnerService
index 35c708a..4106cfa 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -4,12 +4,11 @@
 if ( PHP_SAPI !== 'cli' ) {
        die( "This is not a valid entry point.\n" );
 } elseif ( !class_exists( 'Redis' ) ) {
-       die( "The phpredis is not installed; aborting.\n" );
+       die( "The phpredis extension is not installed; aborting.\n" );
 }
 
-if ( !function_exists( 'posix_kill' ) ) {
-       function posix_kill() {} // no-op on Windows; procs killed with 
proc_terminate()
-}
+require( __DIR__ . '/src/RedisJobService.php' );
+require( __DIR__ . '/src/JobRunnerPipeline' );
 
 error_reporting( E_ALL | E_STRICT );
 ini_set( 'display_errors', 1 );
@@ -21,190 +20,9 @@
        getopt( '', array( 'config-file::', 'help', 'verbose' ) )
 )->main();
 
-class RedisJobRunnerService {
-       /** @var array List of IP:<port> entries */
-       protected $queueSrvs = array();
-       /** @var array List of IP:<port> entries */
-       protected $aggrSrvs = array();
-       /** @var string Redis password */
-       protected $password;
-       /** @var string IP address or hostname */
-       protected $statsdHost;
-       /** @var integer Port number */
-       protected $statsdPort;
-
-       /** @var bool */
-       protected $verbose;
-       /** @var array Map of (job type => integer seconds) */
-       protected $claimTTLMap = array();
-       /** @var array Map of (job type => integer) */
-       protected $attemptsMap = array();
-
-       /** @var array Map of (id => (include,exclude,low-priority,count) */
-       public $loopMap = array();
-       /** @var array Map of (job type => integer) */
-       public $maxRealMap = array();
-       /** @var array Map of (job type => integer) */
-       public $maxMemMap = array();
-       /** @var array String command to run jobs and return the status JSON 
blob */
-       public $dispatcher;
-
-       /**
-        * How long can low priority jobs be run until some high priority
-        * jobs should be checked for and run if they exist.
-        * @var integer
-        */
-       public $hpMaxDelay = 120;
-       /**
-        * The maxtime parameter for runJobs.php for low priority jobs.
-        * The lower this value is, the less jobs of one wiki can hog attention
-        * from the jobs on other wikis, though more overhead is incurred.
-        * This should be lower than hpmaxdelay.
-        * @var integer
-        */
-       public $lpMaxTime = 60;
-       /**
-        * How long can high priority jobs be run until some low priority
-        * jobs should be checked for and run if they exist.
-        * @var integer
-        */
-       public $lpMaxDelay = 600;
-       /**
-        * The maxtime parameter for runJobs.php for high priority jobs.
-        * The lower this value is, the less jobs of one wiki/type can hog 
attention
-        * from jobs of another wiki/type, though more overhead is incurred.
-        * This should be lower than lpmaxdelay.
-        * @var integer
-        */
-       public $hpMaxTime = 30;
-
-       /** @var array Map of (server => Redis object) */
-       protected $conns = array();
-       /** @var array Map of (server => timestamp) */
-       protected $downSrvs = array();
-
-       /** @var RedisJobRunnerService */
-       protected static $instance;
-
-       /**
-        * @param array $args
-        * @return RedisJobRunnerService
-        * @throws Exception
-        */
-       public static function init( array $args ) {
-               if ( self::$instance ) {
-                       throw new Exception( 'RedisJobRunnerService already 
initialized.' );
-               }
-
-               if ( !isset( $args['config-file'] ) || isset( $args['help'] ) ) 
{
-                       die( "Usage: php RedisJobRunnerService.php\n"
-                               . "--config-file=<path>\n"
-                               . "--help\n"
-                       );
-               }
-
-               $file = $args['config-file'];
-               $content = file_get_contents( $file );
-               if ( $content === false ) {
-                       throw new Exception( "Coudn't open configuration file 
'{$file}''" );
-               }
-
-               // Remove comments and load into an array
-               $content = trim( preg_replace( '/\/\/.*$/m', '',  $content ) );
-               $config = json_decode( $content, true );
-               if ( !is_array( $config ) ) {
-                       throw new Exception( "Could not parse JSON file 
'{$file}'." );
-               }
-
-               self::$instance = new self( $config );
-               self::$instance->verbose = isset( $args['verbose'] );
-
-               return self::$instance;
-       }
-
-       /**
-        * @param array $config
-        */
-       protected function __construct( array $config ) {
-               $this->aggrSrvs = $config['redis']['aggregators'];
-               if ( !count( $this->aggrSrvs ) ) {
-                       throw new Exception( "Empty list for 
'redis.aggregators'." );
-               }
-               $this->queueSrvs = $config['redis']['queues'];
-               if ( !count( $this->queueSrvs ) ) {
-                       throw new Exception( "Empty list for 'redis.queues'." );
-               }
-               $this->dispatcher = $config['dispatcher'];
-               if ( !$this->dispatcher ) {
-                       throw new Exception( "No command provided for 
'dispatcher'." );
-               }
-
-               foreach ( $config['groups'] as $name => $group ) {
-                       if ( !is_int( $group['runners'] ) ) {
-                               throw new Exception( "Invalid 'runners' value 
for runner group '$name'." );
-                       } elseif ( $group['runners'] == 0 ) {
-                               continue; // loop disabled
-                       }
-
-                       foreach ( array( 'include', 'exclude', 'low-priority' ) 
as $k ) {
-                               if ( !isset( $group[$k] ) ) {
-                                       $group[$k] = array();
-                               } elseif ( !is_array( $group[$k] ) ) {
-                                       throw new Exception( "Invalid '$k' 
value for runner group '$name'." );
-                               }
-                       }
-                       $this->loopMap[] = $group;
-               }
-
-               if ( isset( $config['wrapper'] ) ) {
-                       $this->wrapper = $config['wrapper'];
-               }
-               if ( isset( $config['redis']['password'] ) ) {
-                       $this->password = $config['redis']['password'];
-               }
-
-               $this->claimTTLMap['*'] = 3600;
-               if ( isset( $config['limits']['claimTTL'] ) ) {
-                       $this->claimTTLMap = $config['limits']['claimTTL'] + 
$this->claimTTLMap;
-               }
-
-               $this->attemptsMap['*'] = 3;
-               if ( isset( $config['limits']['attempts'] ) ) {
-                       $this->attemptsMap = $config['limits']['attempts'] + 
$this->attemptsMap;
-               }
-
-               // Avoid killing processes before they get a fair chance to exit
-               $this->maxRealMap['*'] = 3600;
-               $minRealTime = 2 * max( $this->lpMaxTime, $this->hpMaxTime );
-               if ( isset( $config['limits']['real'] ) ) {
-                       foreach ( $config['limits']['real'] as $type => $value 
) {
-                               $this->maxRealMap[$type] = max( (int)$value, 
$minRealTime );
-                       }
-               }
-
-               $this->maxMemMap['*'] = '300M';
-               if ( isset( $config['limits']['memory'] ) ) {
-                       $this->maxMemMap = $config['limits']['memory'] + 
$this->maxMemMap;
-               }
-
-               if ( isset( $config['statsd'] ) ) {
-                       if ( strpos( $config['statsd'], ':' ) !== false ) {
-                               $parts = explode( ':', $config['statsd'] );
-                               $this->statsdHost = $parts[0];
-                               $this->statsdPort = (int)$parts[1];
-                       } else {
-                               // Use default statsd port if not specified
-                               $this->statsdHost = $config['statsd'];
-                               $this->statsdPort = 8125;
-                       }
-               }
-       }
-
-       /**
-        * Entry point method that starts the service in earnest and keeps 
running
-        */
+class RedisJobRunnerService extends RedisJobService {
        public function main() {
-               $this->notice( "Starting job loop(s)..." );
+               $this->notice( "Starting job spawner loop(s)..." );
 
                $host = gethostname();
                $prioMap = array(); // map of (id => (current priority, since))
@@ -234,7 +52,7 @@
                }
 
                $memLast = memory_get_usage();
-               $this->incrStats( "start.$host", 1 );
+               $this->incrStats( "start-runner.$host", 1 );
                while ( true ) {
                        if ( $useSignals ) {
                                pcntl_signal_dispatch();
@@ -322,7 +140,7 @@
        /**
         * @return array Map of (job type => wiki => UNIX timestamp)
         */
-       protected function &getReadyQueueMap() {
+       private function &getReadyQueueMap() {
                static $pendingDBs = array(); // cache
                static $cacheTimestamp = 0; // UNIX timestamp
 
@@ -365,648 +183,5 @@
                }
 
                return $pendingDBs;
-       }
-
-       /**
-        * Recycle or destroy any jobs that have been claimed for too long
-        * and release any ready delayed jobs into the queue. Also abandon
-        * and prune out jobs that failed too many times. This updates the
-        * aggregator server as necessary.
-        *
-        * @note: similar to JobQueueRedis.php periodic tasks method
-        * @return int|bool Number of jobs recycled/deleted/undelayed/abandoned 
(false if not run)
-        */
-       protected function executeReadyPeriodicTasks() {
-               $jobs = 0;
-
-               // Run no more than every 5 minutes.
-               // This is randomized to scale the liveliness with the # of 
runners.
-               static $lastPeriodicTime = null;
-               $lastPeriodicTime = $lastPeriodicTime ?: time() - mt_rand( 0, 
300 );
-               if ( ( time() - $lastPeriodicTime ) <= 300 ) {
-                       return false;
-               }
-               $lastPeriodicTime = time();
-
-               $host = gethostname();
-
-               $aConn = false;
-               // Connect to the first working server on the list
-               foreach ( $this->aggrSrvs as $aServer ) {
-                       $aConn = $this->getRedisConn( $aServer );
-                       if ( $aConn ) {
-                               break;
-                       }
-               }
-
-               if ( !$aConn ) {
-                       $this->incrStats( "periodictasks.failed.$host", 1 );
-                       return $jobs;
-               }
-
-               $ok = true;
-               try {
-                       $types = $this->redisCmd( $aConn, 'hKeys', array( 
$this->getQueueTypesKey() ) );
-                       $wikiIds = $this->redisCmd( $aConn, 'sMembers', array( 
$this->getWikiSetKey() ) );
-                       if ( !is_array( $types ) || !is_array( $wikiIds ) ) {
-                               $this->incrStats( "periodictasks.failed.$host", 
1 );
-                               return $jobs;
-                       }
-
-                       // Randomize to scale the liveliness with the # of 
runners
-                       shuffle( $types );
-                       shuffle( $wikiIds );
-
-                       $now = time();
-
-                       // Build up the script arguments for each queue...
-                       $paramsByQueue = array();
-                       foreach ( $types as $type ) {
-                               $ttl = isset( $this->claimTTLMap[$type] )
-                                       ? $this->claimTTLMap[$type]
-                                       : $this->claimTTLMap['*'];
-                               $attempts = isset( $this->attemptsMap[$type] )
-                                       ? $this->attemptsMap[$type]
-                                       : $this->attemptsMap['*'];
-                               foreach ( $wikiIds as $wikiId ) {
-                                       $paramsByQueue[] = array(
-                                               'queue'  => array( $type, 
$wikiId ),
-                                               'params' => array(
-                                                       
"{$wikiId}:jobqueue:{$type}:z-claimed", # KEYS[1]
-                                                       
"{$wikiId}:jobqueue:{$type}:h-attempts", # KEYS[2]
-                                                       
"{$wikiId}:jobqueue:{$type}:l-unclaimed", # KEYS[3]
-                                                       
"{$wikiId}:jobqueue:{$type}:h-data", # KEYS[4]
-                                                       
"{$wikiId}:jobqueue:{$type}:z-abandoned", # KEYS[5]
-                                                       
"{$wikiId}:jobqueue:{$type}:z-delayed", # KEYS[6]
-                                                       $now - $ttl, # ARGV[1]
-                                                       $now - 7 * 86400, # 
ARGV[2]
-                                                       $attempts, # ARGV[3]
-                                                       $now # ARGV[4]
-                                               ),
-                                               'keys'   => 6 # number of first 
argument(s) that are keys
-                                       );
-                               }
-                       }
-                       // Batch the Lua queries to avoid client OOMs
-                       $paramsByQueueBatches = array_chunk( $paramsByQueue, 
1000 );
-
-                       $mapSet = array(); // ready map of (queue name => 
timestamp)
-                       // Run the script on all job queue partitions...
-                       foreach ( $this->queueSrvs as $qServer ) {
-                               foreach ( $paramsByQueueBatches as 
$paramsByQueueBatch ) {
-                                       $ok = $this->updateQueueServer(
-                                               $qServer, $paramsByQueueBatch, 
$mapSet, $jobs ) && $ok;
-                               }
-                       }
-                       // Update the map in the aggregator as queues become 
ready
-                       $this->redisCmd( $aConn, 'hMSet', array( 
$this->getReadyQueueKey(), $mapSet ) );
-               } catch ( RedisException $e ) {
-                       $ok = false;
-                       $this->handleRedisError( $e, $aServer );
-               }
-
-               if ( !$ok ) {
-                       $this->error( "Failed to do periodic tasks for some 
queues." );
-                       $this->incrStats( "periodictasks.failed.$host", 1 );
-               }
-
-               return $jobs;
-       }
-
-       private function updateQueueServer( $qServer, array $paramsByQueue, 
array &$mapSet, &$jobs ) {
-               $qConn = $this->getRedisConn( $qServer );
-               if ( !$qConn ) {
-                       return false; // partition down
-               }
-
-               static $script =
-<<<LUA
-               local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, 
kDelayed = unpack(KEYS)
-               local rClaimCutoff, rPruneCutoff, rAttempts, rTime = 
unpack(ARGV)
-               local released,abandoned,pruned,undelayed,ready = 0,0,0,0,0
-               -- Get all non-dead jobs that have an expired claim on them.
-               -- The score for each item is the last claim timestamp (UNIX).
-               local staleClaims = 
redis.call('zRangeByScore',kClaimed,0,rClaimCutoff)
-               for k,id in ipairs(staleClaims) do
-                       local timestamp = redis.call('zScore',kClaimed,id)
-                       local attempts = redis.call('hGet',kAttempts,id)
-                       if attempts < rAttempts then
-                               -- Claim expired and attempts left: re-enqueue 
the job
-                               redis.call('lPush',kUnclaimed,id)
-                               released = released + 1
-                       else
-                               -- Claim expired and no attempts left: mark the 
job as dead
-                               redis.call('zAdd',kAbandoned,timestamp,id)
-                               abandoned = abandoned + 1
-                       end
-                       redis.call('zRem',kClaimed,id)
-               end
-               -- Get all of the dead jobs that have been marked as dead for 
too long.
-               -- The score for each item is the last claim timestamp (UNIX).
-               local deadClaims = 
redis.call('zRangeByScore',kAbandoned,0,rPruneCutoff)
-               for k,id in ipairs(deadClaims) do
-                       -- Stale and out of attempts: remove any traces of the 
job
-                       redis.call('zRem',kAbandoned,id)
-                       redis.call('hDel',kAttempts,id)
-                       redis.call('hDel',kData,id)
-                       pruned = pruned + 1
-               end
-               -- Get the list of ready delayed jobs, sorted by readiness 
(UNIX timestamp)
-               local ids = redis.call('zRangeByScore',kDelayed,0,rTime)
-               -- Migrate the jobs from the "delayed" set to the "unclaimed" 
list
-               for k,id in ipairs(ids) do
-                       redis.call('lPush',kUnclaimed,id)
-                       redis.call('zRem',kDelayed,id)
-               end
-               undelayed = #ids
-               ready = redis.call('lLen',kUnclaimed)
-               return {released,abandoned,pruned,undelayed,ready}
-LUA;
-
-               $ok = true;
-               try {
-                       $sha1 = $this->redisCmd( $qConn, 'script', array( 
'load', $script ) );
-
-                       $this->redisCmd( $qConn, 'multi', array( 
Redis::PIPELINE ) );
-                       foreach ( $paramsByQueue as $params ) {
-                               $this->redisCmd( $qConn, 'evalSha',
-                                       array( $sha1, $params['params'], 
$params['keys'] ) );
-                       }
-                       $res = $this->redisCmd( $qConn, 'exec' );
-
-                       $now = time();
-                       foreach ( $res as $i => $result ) {
-                               if ( !$result ) {
-                                       $ok = false;
-                                       continue;
-                               }
-                               list( $qType, $qWiki ) = 
$paramsByQueue[$i]['queue'];
-                               list( $released, $abandoned, $pruned, 
$undelayed, $ready ) = $result;
-                               if ( $released > 0 || $undelayed > 0 || $ready 
> 0 ) {
-                                       // This checks $ready to handle lost 
aggregator updates as well as
-                                       // to merge after network partitions 
that caused aggregator fail-over.
-                                       $mapSet[$this->encQueueName( $qType, 
$qWiki )] = $now;
-                               }
-                               $jobs += ( array_sum( $result ) - $ready );
-                               $this->incrStats( "job-recycle.$qType.$qWiki", 
$released );
-                               $this->incrStats( "job-abandon.$qType.$qWiki", 
$abandoned );
-                               $this->incrStats( "job-undelay.$qType.$qWiki", 
$undelayed );
-                       }
-               } catch ( RedisException $e ) {
-                       $ok = false;
-                       $this->handleRedisError( $e, $qServer );
-               }
-
-               return $ok;
-       }
-
-       /**
-        * @return string (per JobQueueAggregatorRedis.php)
-        */
-       private function getReadyQueueKey() {
-               return "jobqueue:aggregator:h-ready-queues:v2"; // global
-       }
-
-       /**
-        * @return string (per JobQueueAggregatorRedis.php)
-        */
-       private function getQueueTypesKey() {
-               return "jobqueue:aggregator:h-queue-types:v2"; // global
-       }
-
-       /**
-        * @return string
-        */
-       private function getWikiSetKey() {
-               return "jobqueue:aggregator:s-wikis:v2"; // global
-       }
-
-       /**
-        * @param string $type
-        * @param string $wiki
-        * @return string (per JobQueueAggregatorRedis.php)
-        */
-       private function encQueueName( $type, $wiki ) {
-               return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
-       }
-
-       /**
-        * @param string $name
-        * @return string (per JobQueueAggregatorRedis.php)
-        */
-       private function dencQueueName( $name ) {
-               list( $type, $wiki ) = explode( '/', $name, 2 );
-
-               return array( rawurldecode( $type ), rawurldecode( $wiki ) );
-       }
-
-       /**
-        * @param string $server
-        * @return Redis|boolean|array
-        */
-       protected function getRedisConn( $server ) {
-               // Check the listing "dead" servers which have had a connection 
errors.
-               // Srvs are marked dead for a limited period of time, to
-               // avoid excessive overhead from repeated connection timeouts.
-               if ( isset( $this->downSrvs[$server] ) ) {
-                       $now = time();
-                       if ( $now > $this->downSrvs[$server] ) {
-                               // Dead time expired
-                               unset( $this->downSrvs[$server] );
-                       } else {
-                               // Server is dead
-                               return false;
-                       }
-               }
-
-               if ( isset( $this->conns[$server] ) ) {
-                       return $this->conns[$server];
-               }
-
-               try {
-                       $conn = new Redis();
-                       if ( strpos( $server, ':' ) === false ) {
-                               $host = $server;
-                               $port = null;
-                       } else {
-                               list( $host, $port ) = explode( ':', $server );
-                       }
-                       $result = $conn->connect( $host, $port, 5 );
-                       if ( !$result ) {
-                               $this->error( "Could not connect to Redis 
server $host:$port." );
-                               // Mark server down for some time to avoid 
further timeouts
-                               $this->downSrvs[$server] = time() + 30;
-
-                               return false;
-                       }
-                       if ( $this->password !== null ) {
-                               $conn->auth( $this->password );
-                       }
-               } catch ( RedisException $e ) {
-                       $this->downSrvs[$server] = time() + 30;
-
-                       return false;
-               }
-
-               if ( $conn ) {
-                       $conn->setOption( Redis::OPT_READ_TIMEOUT, 5 );
-                       $conn->setOption( Redis::OPT_SERIALIZER, 
Redis::SERIALIZER_NONE );
-                       $this->conns[$server] = $conn;
-
-                       return $conn;
-               } else {
-                       return false;
-               }
-       }
-
-       /**
-        * @param RedisException $e
-        * @param string $server
-        */
-       protected function handleRedisError( RedisException $e, $server ) {
-               unset( $this->conns[$server] );
-               $this->error( "Redis error: " . $e->getMessage() );
-               $this->incrStats( "redis-error." . gethostname(), 1 );
-       }
-
-       /**
-        * @param Redis $conn
-        * @param string $cmd
-        * @param array $args
-        * @return mixed
-        */
-       protected function redisCmd( Redis $conn, $cmd, array $args = array() ) 
{
-               $conn->clearLastError();
-               // we had some job runners oom'ing on this call, log what we are
-               // doing so there is relevant information next to the oom
-               $this->debug( "Redis cmd: $cmd " . json_encode( $args ) );
-               $res = call_user_func_array( array( $conn, $cmd ), $args );
-               if ( $conn->getLastError() ) {
-                       // Make all errors be exceptions instead of "most but 
not all".
-                       // This will let the caller know to reset the 
connection to be safe.
-                       throw new RedisException( $conn->getLastError() );
-               }
-               return $res;
-       }
-
-       /**
-        * @param string $event
-        * @param integer $delta
-        * @return void
-        */
-       public function incrStats( $event, $delta = 1 ) {
-               if ( !$this->statsdHost || $delta == 0 ) {
-                       return; // nothing to do
-               }
-
-               static $format = "%s:%s|m\n";
-               $packet = sprintf( $format, "jobrunner.$event", $delta );
-
-               if ( !function_exists( 'socket_create' ) ) {
-                       $this->debug( 'No "socket_create" method available.' );
-                       return;
-               }
-
-               static $socket = null;
-               if ( !$socket ) {
-                       $socket = socket_create( AF_INET, SOCK_DGRAM, SOL_UDP );
-               }
-
-               socket_sendto(
-                       $socket,
-                       $packet,
-                       strlen( $packet ),
-                       0,
-                       $this->statsdHost,
-                       $this->statsdPort
-               );
-       }
-
-       /**
-        * @param string $s
-        */
-       public function debug( $s ) {
-               if ( $this->verbose ) {
-                       print date( DATE_ISO8601 ) . ": $s\n";
-               }
-       }
-
-       /**
-        * @param string $s
-        */
-       public function notice( $s ) {
-               print date( DATE_ISO8601 ) . ": $s\n";
-       }
-
-       /**
-        * @param string $s
-        */
-       public function error( $s ) {
-               fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
-       }
-}
-
-class JobRunnerPipeline {
-       /** @var RedisJobRunnerService */
-       protected $srvc;
-       /** @var array (loop ID => slot ID => slot status array) */
-       protected $procMap = array();
-
-       /**
-        * @param JobRunnerService $service
-        */
-       public function __construct( RedisJobRunnerService $service ) {
-               $this->srvc = $service;
-       }
-
-       /**
-        * @param string $loop
-        * @param int $slot
-        */
-       public function initSlot( $loop, $slot ) {
-               $this->procMap[$loop][$slot] = array(
-                       'handle'  => false,
-                       'pipes'   => array(),
-                       'db'      => null,
-                       'type'    => null,
-                       'cmd'     => null,
-                       'stime'   => 0,
-                       'sigtime' => 0,
-               );
-       }
-
-       /**
-        * @param integer $loop
-        * @param array $prioMap
-        * @param array $pending
-        * @return array
-        */
-       public function refillSlots( $loop, array $prioMap, array &$pending ) {
-               $free = 0;
-               $new = 0;
-               $host = gethostname();
-               $cTime = time();
-               foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
-                       $status = $procSlot['handle']
-                               ? proc_get_status( $procSlot['handle'] )
-                               : false;
-                       if ( $status && $status['running'] ) {
-                               $maxReal = isset( 
$this->srvc->maxRealMap[$procSlot['type']] )
-                                       ? 
$this->srvc->maxRealMap[$procSlot['type']]
-                                       : $this->srvc->maxRealMap['*'];
-                               $age = $cTime - $procSlot['stime'];
-                               if ( $age >= $maxReal && !$procSlot['sigtime'] 
) {
-                                       $cmd = $procSlot['cmd'];
-                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot timed out " .
-                                               "[{$age}s; max: 
{$maxReal}s]:\n$cmd" );
-                                       posix_kill( $status['pid'], SIGTERM ); 
// non-blocking
-                                       $procSlot['sigtime'] = time();
-                                       $this->srvc->incrStats( 
'runner-status.timeout', 1 );
-                               } elseif ( $age >= $maxReal && ( $cTime - 
$procSlot['sigtime'] ) > 5 ) {
-                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot sent SIGKILL." );
-                                       $this->closeRunner( $loop, $slot, 
$procSlot, SIGKILL );
-                                       $this->srvc->incrStats( 
'runner-status.kill', 1 );
-                               } else {
-                                       continue; // slot is busy
-                               }
-                       } elseif ( $status && !$status['running'] ) {
-                               $response = trim( stream_get_contents( 
$procSlot['pipes'][1] ) );
-                               // $result will be an array if no exceptions 
happened
-                               $result = json_decode( $response, true );
-                               if ( $status['exitcode'] == 0 && is_array( 
$result ) ) {
-                                       // If this finished early, lay off of 
the queue for a while
-                                       if ( ( $cTime - $procSlot['stime'] ) < 
$this->srvc->hpMaxTime/2 ) {
-                                               unset( 
$pending[$procSlot['type']][$procSlot['db']] );
-                                               $this->srvc->debug( "Queue 
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
-                                       }
-                                       $ok = 0; // jobs that ran OK
-                                       foreach ( $result['jobs'] as $status ) {
-                                               $ok += ( $status['status'] === 
'ok' ) ? 1 : 0;
-                                       }
-                                       $failed = count( $result['jobs'] ) - 
$ok;
-                                       $this->srvc->incrStats( 
"pop.{$procSlot['type']}.ok.{$host}", $ok );
-                                       $this->srvc->incrStats( 
"pop.{$procSlot['type']}.failed.{$host}", $failed );
-                               } else {
-                                       // Mention any serious errors that may 
have occured
-                                       $cmd = $procSlot['cmd'];
-                                       $error = trim( stream_get_contents( 
$procSlot['pipes'][2] ) ) ?: $response;
-                                       if ( strlen( $error ) > 4096 ) { // 
truncate long errors
-                                               $error = mb_substr( $error, 0, 
4096 ) . '...';
-                                       }
-                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot " .
-                                               "gave status 
'{$status['exitcode']}':\n$cmd\n\t$error" );
-                                       $this->srvc->incrStats( 
'runner-status.error', 1 );
-                               }
-                               $this->closeRunner( $loop, $slot, $procSlot );
-                       } elseif ( !$status && $procSlot['handle'] ) {
-                               $this->srvc->error( "Runner loop $loop process 
in slot $slot gave no status." );
-                               $this->closeRunner( $loop, $slot, $procSlot );
-                               $this->srvc->incrStats( 'runner-status.none', 1 
);
-                       }
-                       ++$free;
-                       $queue = $this->selectQueue( $loop, $prioMap, $pending 
);
-                       if ( !$queue ) {
-                               break;
-                       }
-                       // Spawn a job runner for this loop ID
-                       $highPrio = $prioMap[$loop]['high'];
-                       $this->spawnRunner( $loop, $slot, $highPrio, $queue, 
$procSlot );
-                       ++$new;
-               }
-               unset( $procSlot );
-
-               return array( $free, $new );
-       }
-
-       /**
-        * @param integer $loop
-        * @param array $prioMap
-        * @param array $pending
-        * @return array|boolean
-        */
-       protected function selectQueue( $loop, array $prioMap, array $pending ) 
{
-               $include = $this->srvc->loopMap[$loop]['include'];
-               $exclude = $this->srvc->loopMap[$loop]['exclude'];
-               if ( $prioMap[$loop]['high'] ) {
-                       $exclude = array_merge( $exclude, 
$this->srvc->loopMap[$loop]['low-priority'] );
-               } else {
-                       $include = array_merge( $include, 
$this->srvc->loopMap[$loop]['low-priority'] );
-               }
-               if ( in_array( '*', $include ) ) {
-                       $include = array_merge( $include, array_keys( $pending 
) );
-               }
-
-               $candidateTypes = array_diff( array_unique( $include ), 
$exclude, array( '*' ) );
-
-               $candidates = array(); // list of (type, db)
-               // Flatten the tree of candidates into a flat list so that a 
random
-               // item can be selected, weighing each queue (type/db tuple) 
equally.
-               foreach ( $candidateTypes as $type ) {
-                       if ( isset( $pending[$type] ) ) {
-                               foreach ( $pending[$type] as $db => $since ) {
-                                       $candidates[] = array( $type, $db );
-                               }
-                       }
-               }
-
-               if ( !count( $candidates ) ) {
-                       return false; // no jobs for this type
-               }
-
-               return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
-       }
-
-       /**
-        * @param integer $loop
-        * @param integer $slot
-        * @param bool $highPrio
-        * @param array $queue
-        * @param array $procSlot
-        * @return bool
-        */
-       protected function spawnRunner( $loop, $slot, $highPrio, array $queue, 
array &$procSlot ) {
-               // Pick a random queue
-               list( $type, $db ) = $queue;
-               $maxtime = $highPrio ? $this->srvc->lpMaxTime : 
$this->srvc->hpMaxTime;
-               $maxmem = isset( $this->srvc->maxMemMap[$type] )
-                       ? $this->srvc->maxMemMap[$type]
-                       : $this->srvc->maxMemMap['*'];
-
-               // Make sure the runner is launched with various time/memory 
limits.
-               // Nice the process so things like ssh and deployment scripts 
are fine.
-               $what = $with = array();
-               foreach ( compact( 'db', 'type', 'maxtime', 'maxmem' ) as $k => 
$v ) {
-                       $what[] = "%($k)u";
-                       $with[] = rawurlencode( $v );
-                       $what[] = "%($k)x";
-                       $with[] = escapeshellarg( $v );
-               }
-               // The dispatcher might be runJobs.php, curl, or wget
-               $cmd = str_replace( $what, $with, $this->srvc->dispatcher );
-
-               $descriptors = array(
-                       0 => array( "pipe", "r" ), // stdin (child)
-                       1 => array( "pipe", "w" ), // stdout (child)
-                       2 => array( "pipe", "w" ) // stderr (child)
-               );
-
-               $this->srvc->debug( "Spawning runner in loop $loop at slot 
$slot ($type, $db):\n\t$cmd." );
-
-               // Start the runner in the background
-               $procSlot['handle'] = proc_open( $cmd, $descriptors, 
$procSlot['pipes'] );
-               if ( $procSlot['handle'] ) {
-                       // Set a timeout so stream_get_contents() won't block 
for sanity
-                       stream_set_timeout( $procSlot['pipes'][1], 5 );
-                       stream_set_timeout( $procSlot['pipes'][2], 5 );
-                       // Close the unused STDIN pipe
-                       fclose( $procSlot['pipes'][0] );
-                       unset( $procSlot['pipes'][0] ); // unused
-               }
-
-               $procSlot['db'] = $db;
-               $procSlot['type'] = $type;
-               $procSlot['cmd'] = $cmd;
-               $procSlot['stime'] = time();
-               $procSlot['sigtime'] = 0;
-
-               if ( $procSlot['handle'] ) {
-                       return true;
-               } else {
-                       $this->srvc->error( "Could not spawn process in loop 
$loop: $cmd" );
-                       $this->srvc->incrStats( 'runner-status.error', 1 );
-                       return false;
-               }
-       }
-
-       /**
-        * @param integer $loop
-        * @param integer $slot
-        * @param array $procSlot
-        * @param integer $signal
-        */
-       protected function closeRunner( $loop, $slot, array &$procSlot, $signal 
= null ) {
-               if ( $procSlot['pipes'] ) {
-                       if ( $procSlot['pipes'][1] !== false ) {
-                               fclose( $procSlot['pipes'][1] );
-                               $procSlot['pipes'][1] = false;
-                       }
-                       if ( $procSlot['pipes'][2] !== false ) {
-                               fclose( $procSlot['pipes'][2] );
-                               $procSlot['pipes'][2] = false;
-                       }
-               }
-               if ( $procSlot['handle'] ) {
-                       $this->srvc->debug( "Closing process in loop $loop at 
slot $slot." );
-                       if ( $signal !== null ) {
-                               // Tell the process to close with a signal
-                               proc_terminate( $procSlot['handle'], $signal );
-                       } else {
-                               // Wait for the process to finish on its own
-                               proc_close( $procSlot['handle'] );
-                       }
-               }
-               $procSlot['handle'] = false;
-               $procSlot['db'] = null;
-               $procSlot['type'] = null;
-               $procSlot['stime'] = 0;
-               $procSlot['sigtime'] = 0;
-               $procSlot['cmd'] = null;
-       }
-
-       public function terminateSlots() {
-               foreach ( $this->procMap as &$procSlots ) {
-                       foreach ( $procSlots as &$procSlot ) {
-                               if ( !$procSlot['handle'] ) {
-                                       continue;
-                               }
-                               fclose( $procSlot['pipes'][1] );
-                               fclose( $procSlot['pipes'][2] );
-                               $status = proc_get_status( $procSlot['handle'] 
);
-                               print "Sending SIGTERM to {$status['pid']}.\n";
-                               proc_terminate( $procSlot['handle'] );
-                       }
-                       unset( $procSlot );
-               }
-               unset( $procSlots );
        }
 }
diff --git a/src/redisJobPipeline.php b/src/redisJobPipeline.php
new file mode 100755
index 0000000..310ca5d
--- /dev/null
+++ b/src/redisJobPipeline.php
@@ -0,0 +1,270 @@
+<?php
+
+if ( !function_exists( 'posix_kill' ) ) {
+       function posix_kill() {} // no-op on Windows; procs killed with 
proc_terminate()
+}
+
+class JobRunnerPipeline {
+       /** @var RedisJobService */
+       protected $srvc;
+       /** @var array (loop ID => slot ID => slot status array) */
+       protected $procMap = array();
+
+       /**
+        * @param JobRunnerService $service
+        */
+       public function __construct( RedisJobService $service ) {
+               $this->srvc = $service;
+       }
+
+       /**
+        * @param string $loop
+        * @param int $slot
+        */
+       public function initSlot( $loop, $slot ) {
+               $this->procMap[$loop][$slot] = array(
+                       'handle'  => false,
+                       'pipes'   => array(),
+                       'db'      => null,
+                       'type'    => null,
+                       'cmd'     => null,
+                       'stime'   => 0,
+                       'sigtime' => 0,
+               );
+       }
+
+       /**
+        * @param integer $loop
+        * @param array $prioMap
+        * @param array $pending
+        * @return array
+        */
+       public function refillSlots( $loop, array $prioMap, array &$pending ) {
+               $free = 0;
+               $new = 0;
+               $host = gethostname();
+               $cTime = time();
+               foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
+                       $status = $procSlot['handle']
+                               ? proc_get_status( $procSlot['handle'] )
+                               : false;
+                       if ( $status && $status['running'] ) {
+                               $maxReal = isset( 
$this->srvc->maxRealMap[$procSlot['type']] )
+                                       ? 
$this->srvc->maxRealMap[$procSlot['type']]
+                                       : $this->srvc->maxRealMap['*'];
+                               $age = $cTime - $procSlot['stime'];
+                               if ( $age >= $maxReal && !$procSlot['sigtime'] 
) {
+                                       $cmd = $procSlot['cmd'];
+                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot timed out " .
+                                               "[{$age}s; max: 
{$maxReal}s]:\n$cmd" );
+                                       posix_kill( $status['pid'], SIGTERM ); 
// non-blocking
+                                       $procSlot['sigtime'] = time();
+                                       $this->srvc->incrStats( 
'runner-status.timeout', 1 );
+                               } elseif ( $age >= $maxReal && ( $cTime - 
$procSlot['sigtime'] ) > 5 ) {
+                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot sent SIGKILL." );
+                                       $this->closeRunner( $loop, $slot, 
$procSlot, SIGKILL );
+                                       $this->srvc->incrStats( 
'runner-status.kill', 1 );
+                               } else {
+                                       continue; // slot is busy
+                               }
+                       } elseif ( $status && !$status['running'] ) {
+                               $response = trim( stream_get_contents( 
$procSlot['pipes'][1] ) );
+                               // $result will be an array if no exceptions 
happened
+                               $result = json_decode( $response, true );
+                               if ( $status['exitcode'] == 0 && is_array( 
$result ) ) {
+                                       // If this finished early, lay off of 
the queue for a while
+                                       if ( ( $cTime - $procSlot['stime'] ) < 
$this->srvc->hpMaxTime/2 ) {
+                                               unset( 
$pending[$procSlot['type']][$procSlot['db']] );
+                                               $this->srvc->debug( "Queue 
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
+                                       }
+                                       $ok = 0; // jobs that ran OK
+                                       foreach ( $result['jobs'] as $status ) {
+                                               $ok += ( $status['status'] === 
'ok' ) ? 1 : 0;
+                                       }
+                                       $failed = count( $result['jobs'] ) - 
$ok;
+                                       $this->srvc->incrStats( 
"pop.{$procSlot['type']}.ok.{$host}", $ok );
+                                       $this->srvc->incrStats( 
"pop.{$procSlot['type']}.failed.{$host}", $failed );
+                               } else {
+                                       // Mention any serious errors that may 
have occured
+                                       $cmd = $procSlot['cmd'];
+                                       $error = trim( stream_get_contents( 
$procSlot['pipes'][2] ) ) ?: $response;
+                                       if ( strlen( $error ) > 4096 ) { // 
truncate long errors
+                                               $error = mb_substr( $error, 0, 
4096 ) . '...';
+                                       }
+                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot " .
+                                               "gave status 
'{$status['exitcode']}':\n$cmd\n\t$error" );
+                                       $this->srvc->incrStats( 
'runner-status.error', 1 );
+                               }
+                               $this->closeRunner( $loop, $slot, $procSlot );
+                       } elseif ( !$status && $procSlot['handle'] ) {
+                               $this->srvc->error( "Runner loop $loop process 
in slot $slot gave no status." );
+                               $this->closeRunner( $loop, $slot, $procSlot );
+                               $this->srvc->incrStats( 'runner-status.none', 1 
);
+                       }
+                       ++$free;
+                       $queue = $this->selectQueue( $loop, $prioMap, $pending 
);
+                       if ( !$queue ) {
+                               break;
+                       }
+                       // Spawn a job runner for this loop ID
+                       $highPrio = $prioMap[$loop]['high'];
+                       $this->spawnRunner( $loop, $slot, $highPrio, $queue, 
$procSlot );
+                       ++$new;
+               }
+               unset( $procSlot );
+
+               return array( $free, $new );
+       }
+
+       /**
+        * @param integer $loop
+        * @param array $prioMap
+        * @param array $pending
+        * @return array|boolean
+        */
+       protected function selectQueue( $loop, array $prioMap, array $pending ) 
{
+               $include = $this->srvc->loopMap[$loop]['include'];
+               $exclude = $this->srvc->loopMap[$loop]['exclude'];
+               if ( $prioMap[$loop]['high'] ) {
+                       $exclude = array_merge( $exclude, 
$this->srvc->loopMap[$loop]['low-priority'] );
+               } else {
+                       $include = array_merge( $include, 
$this->srvc->loopMap[$loop]['low-priority'] );
+               }
+               if ( in_array( '*', $include ) ) {
+                       $include = array_merge( $include, array_keys( $pending 
) );
+               }
+
+               $candidateTypes = array_diff( array_unique( $include ), 
$exclude, array( '*' ) );
+
+               $candidates = array(); // list of (type, db)
+               // Flatten the tree of candidates into a flat list so that a 
random
+               // item can be selected, weighing each queue (type/db tuple) 
equally.
+               foreach ( $candidateTypes as $type ) {
+                       if ( isset( $pending[$type] ) ) {
+                               foreach ( $pending[$type] as $db => $since ) {
+                                       $candidates[] = array( $type, $db );
+                               }
+                       }
+               }
+
+               if ( !count( $candidates ) ) {
+                       return false; // no jobs for this type
+               }
+
+               return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
+       }
+
+       /**
+        * @param integer $loop
+        * @param integer $slot
+        * @param bool $highPrio
+        * @param array $queue
+        * @param array $procSlot
+        * @return bool
+        */
+       protected function spawnRunner( $loop, $slot, $highPrio, array $queue, 
array &$procSlot ) {
+               // Pick a random queue
+               list( $type, $db ) = $queue;
+               $maxtime = $highPrio ? $this->srvc->lpMaxTime : 
$this->srvc->hpMaxTime;
+               $maxmem = isset( $this->srvc->maxMemMap[$type] )
+                       ? $this->srvc->maxMemMap[$type]
+                       : $this->srvc->maxMemMap['*'];
+
+               // Make sure the runner is launched with various time/memory 
limits.
+               // Nice the process so things like ssh and deployment scripts 
are fine.
+               $what = $with = array();
+               foreach ( compact( 'db', 'type', 'maxtime', 'maxmem' ) as $k => 
$v ) {
+                       $what[] = "%($k)u";
+                       $with[] = rawurlencode( $v );
+                       $what[] = "%($k)x";
+                       $with[] = escapeshellarg( $v );
+               }
+               // The dispatcher might be runJobs.php, curl, or wget
+               $cmd = str_replace( $what, $with, $this->srvc->dispatcher );
+
+               $descriptors = array(
+                       0 => array( "pipe", "r" ), // stdin (child)
+                       1 => array( "pipe", "w" ), // stdout (child)
+                       2 => array( "pipe", "w" ) // stderr (child)
+               );
+
+               $this->srvc->debug( "Spawning runner in loop $loop at slot 
$slot ($type, $db):\n\t$cmd." );
+
+               // Start the runner in the background
+               $procSlot['handle'] = proc_open( $cmd, $descriptors, 
$procSlot['pipes'] );
+               if ( $procSlot['handle'] ) {
+                       // Set a timeout so stream_get_contents() won't block 
for sanity
+                       stream_set_timeout( $procSlot['pipes'][1], 5 );
+                       stream_set_timeout( $procSlot['pipes'][2], 5 );
+                       // Close the unused STDIN pipe
+                       fclose( $procSlot['pipes'][0] );
+                       unset( $procSlot['pipes'][0] ); // unused
+               }
+
+               $procSlot['db'] = $db;
+               $procSlot['type'] = $type;
+               $procSlot['cmd'] = $cmd;
+               $procSlot['stime'] = time();
+               $procSlot['sigtime'] = 0;
+
+               if ( $procSlot['handle'] ) {
+                       return true;
+               } else {
+                       $this->srvc->error( "Could not spawn process in loop 
$loop: $cmd" );
+                       $this->srvc->incrStats( 'runner-status.error', 1 );
+                       return false;
+               }
+       }
+
+       /**
+        * @param integer $loop
+        * @param integer $slot
+        * @param array $procSlot
+        * @param integer $signal
+        */
+       protected function closeRunner( $loop, $slot, array &$procSlot, $signal 
= null ) {
+               if ( $procSlot['pipes'] ) {
+                       if ( $procSlot['pipes'][1] !== false ) {
+                               fclose( $procSlot['pipes'][1] );
+                               $procSlot['pipes'][1] = false;
+                       }
+                       if ( $procSlot['pipes'][2] !== false ) {
+                               fclose( $procSlot['pipes'][2] );
+                               $procSlot['pipes'][2] = false;
+                       }
+               }
+               if ( $procSlot['handle'] ) {
+                       $this->srvc->debug( "Closing process in loop $loop at 
slot $slot." );
+                       if ( $signal !== null ) {
+                               // Tell the process to close with a signal
+                               proc_terminate( $procSlot['handle'], $signal );
+                       } else {
+                               // Wait for the process to finish on its own
+                               proc_close( $procSlot['handle'] );
+                       }
+               }
+               $procSlot['handle'] = false;
+               $procSlot['db'] = null;
+               $procSlot['type'] = null;
+               $procSlot['stime'] = 0;
+               $procSlot['sigtime'] = 0;
+               $procSlot['cmd'] = null;
+       }
+
+       public function terminateSlots() {
+               foreach ( $this->procMap as &$procSlots ) {
+                       foreach ( $procSlots as &$procSlot ) {
+                               if ( !$procSlot['handle'] ) {
+                                       continue;
+                               }
+                               fclose( $procSlot['pipes'][1] );
+                               fclose( $procSlot['pipes'][2] );
+                               $status = proc_get_status( $procSlot['handle'] 
);
+                               print "Sending SIGTERM to {$status['pid']}.\n";
+                               proc_terminate( $procSlot['handle'] );
+                       }
+                       unset( $procSlot );
+               }
+               unset( $procSlots );
+       }
+}
diff --git a/src/redisJobService.php b/src/redisJobService.php
new file mode 100755
index 0000000..0d08cd4
--- /dev/null
+++ b/src/redisJobService.php
@@ -0,0 +1,367 @@
+<?php
+
+/**
+ * Base class for job services with main() implemented by subclasses
+ */
+abstract class RedisJobService {
+       /** @var array List of IP:<port> entries */
+       protected $queueSrvs = array();
+       /** @var array List of IP:<port> entries */
+       protected $aggrSrvs = array();
+       /** @var string Redis password */
+       protected $password;
+       /** @var string IP address or hostname */
+       protected $statsdHost;
+       /** @var integer Port number */
+       protected $statsdPort;
+
+       /** @var bool */
+       protected $verbose;
+       /** @var array Map of (job type => integer seconds) */
+       protected $claimTTLMap = array();
+       /** @var array Map of (job type => integer) */
+       protected $attemptsMap = array();
+
+       /** @var array Map of (id => (include,exclude,low-priority,count) */
+       public $loopMap = array();
+       /** @var array Map of (job type => integer) */
+       public $maxRealMap = array();
+       /** @var array Map of (job type => integer) */
+       public $maxMemMap = array();
+       /** @var array String command to run jobs and return the status JSON 
blob */
+       public $dispatcher;
+
+       /**
+        * How long can low priority jobs be run until some high priority
+        * jobs should be checked for and run if they exist.
+        * @var integer
+        */
+       public $hpMaxDelay = 120;
+       /**
+        * The maxtime parameter for runJobs.php for low priority jobs.
+        * The lower this value is, the less jobs of one wiki can hog attention
+        * from the jobs on other wikis, though more overhead is incurred.
+        * This should be lower than hpmaxdelay.
+        * @var integer
+        */
+       public $lpMaxTime = 60;
+       /**
+        * How long can high priority jobs be run until some low priority
+        * jobs should be checked for and run if they exist.
+        * @var integer
+        */
+       public $lpMaxDelay = 600;
+       /**
+        * The maxtime parameter for runJobs.php for high priority jobs.
+        * The lower this value is, the less jobs of one wiki/type can hog 
attention
+        * from jobs of another wiki/type, though more overhead is incurred.
+        * This should be lower than lpmaxdelay.
+        * @var integer
+        */
+       public $hpMaxTime = 30;
+
+       /** @var array Map of (server => Redis object) */
+       protected $conns = array();
+       /** @var array Map of (server => timestamp) */
+       protected $downSrvs = array();
+
+       /**
+        * @param array $args
+        * @return RedisJobRunnerService
+        * @throws Exception
+        */
+       public static function init( array $args ) {
+               if ( !isset( $args['config-file'] ) || isset( $args['help'] ) ) 
{
+                       die( "Usage: php RedisJobRunnerService.php\n"
+                               . "--config-file=<path>\n"
+                               . "--help\n"
+                       );
+               }
+
+               $file = $args['config-file'];
+               $content = file_get_contents( $file );
+               if ( $content === false ) {
+                       throw new Exception( "Coudn't open configuration file 
'{$file}''" );
+               }
+
+               // Remove comments and load into an array
+               $content = trim( preg_replace( '/\/\/.*$/m', '',  $content ) );
+               $config = json_decode( $content, true );
+               if ( !is_array( $config ) ) {
+                       throw new Exception( "Could not parse JSON file 
'{$file}'." );
+               }
+
+               $instance = new static( $config );
+               $instance->verbose = isset( $args['verbose'] );
+
+               return $instance;
+       }
+
+       /**
+        * @param array $config
+        */
+       protected function __construct( array $config ) {
+               $this->aggrSrvs = $config['redis']['aggregators'];
+               if ( !count( $this->aggrSrvs ) ) {
+                       throw new Exception( "Empty list for 
'redis.aggregators'." );
+               }
+               $this->queueSrvs = $config['redis']['queues'];
+               if ( !count( $this->queueSrvs ) ) {
+                       throw new Exception( "Empty list for 'redis.queues'." );
+               }
+               $this->dispatcher = $config['dispatcher'];
+               if ( !$this->dispatcher ) {
+                       throw new Exception( "No command provided for 
'dispatcher'." );
+               }
+
+               foreach ( $config['groups'] as $name => $group ) {
+                       if ( !is_int( $group['runners'] ) ) {
+                               throw new Exception( "Invalid 'runners' value 
for runner group '$name'." );
+                       } elseif ( $group['runners'] == 0 ) {
+                               continue; // loop disabled
+                       }
+
+                       foreach ( array( 'include', 'exclude', 'low-priority' ) 
as $k ) {
+                               if ( !isset( $group[$k] ) ) {
+                                       $group[$k] = array();
+                               } elseif ( !is_array( $group[$k] ) ) {
+                                       throw new Exception( "Invalid '$k' 
value for runner group '$name'." );
+                               }
+                       }
+                       $this->loopMap[] = $group;
+               }
+
+               if ( isset( $config['wrapper'] ) ) {
+                       $this->wrapper = $config['wrapper'];
+               }
+               if ( isset( $config['redis']['password'] ) ) {
+                       $this->password = $config['redis']['password'];
+               }
+
+               $this->claimTTLMap['*'] = 3600;
+               if ( isset( $config['limits']['claimTTL'] ) ) {
+                       $this->claimTTLMap = $config['limits']['claimTTL'] + 
$this->claimTTLMap;
+               }
+
+               $this->attemptsMap['*'] = 3;
+               if ( isset( $config['limits']['attempts'] ) ) {
+                       $this->attemptsMap = $config['limits']['attempts'] + 
$this->attemptsMap;
+               }
+
+               // Avoid killing processes before they get a fair chance to exit
+               $this->maxRealMap['*'] = 3600;
+               $minRealTime = 2 * max( $this->lpMaxTime, $this->hpMaxTime );
+               if ( isset( $config['limits']['real'] ) ) {
+                       foreach ( $config['limits']['real'] as $type => $value 
) {
+                               $this->maxRealMap[$type] = max( (int)$value, 
$minRealTime );
+                       }
+               }
+
+               $this->maxMemMap['*'] = '300M';
+               if ( isset( $config['limits']['memory'] ) ) {
+                       $this->maxMemMap = $config['limits']['memory'] + 
$this->maxMemMap;
+               }
+
+               if ( isset( $config['statsd'] ) ) {
+                       if ( strpos( $config['statsd'], ':' ) !== false ) {
+                               $parts = explode( ':', $config['statsd'] );
+                               $this->statsdHost = $parts[0];
+                               $this->statsdPort = (int)$parts[1];
+                       } else {
+                               // Use default statsd port if not specified
+                               $this->statsdHost = $config['statsd'];
+                               $this->statsdPort = 8125;
+                       }
+               }
+       }
+
+       /**
+        * Entry point method that starts the service in earnest and keeps 
running
+        */
+       abstract public function main();
+
+       /**
+        * @return string (per JobQueueAggregatorRedis.php)
+        */
+       public function getReadyQueueKey() {
+               return "jobqueue:aggregator:h-ready-queues:v2"; // global
+       }
+
+       /**
+        * @return string (per JobQueueAggregatorRedis.php)
+        */
+       public function getQueueTypesKey() {
+               return "jobqueue:aggregator:h-queue-types:v2"; // global
+       }
+
+       /**
+        * @return string
+        */
+       public function getWikiSetKey() {
+               return "jobqueue:aggregator:s-wikis:v2"; // global
+       }
+
+       /**
+        * @param string $type
+        * @param string $wiki
+        * @return string (per JobQueueAggregatorRedis.php)
+        */
+       public function encQueueName( $type, $wiki ) {
+               return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
+       }
+
+       /**
+        * @param string $name
+        * @return string (per JobQueueAggregatorRedis.php)
+        */
+       public function dencQueueName( $name ) {
+               list( $type, $wiki ) = explode( '/', $name, 2 );
+
+               return array( rawurldecode( $type ), rawurldecode( $wiki ) );
+       }
+
+       /**
+        * @param string $server
+        * @return Redis|boolean|array
+        */
+       public function getRedisConn( $server ) {
+               // Check the listing "dead" servers which have had a connection 
errors.
+               // Srvs are marked dead for a limited period of time, to
+               // avoid excessive overhead from repeated connection timeouts.
+               if ( isset( $this->downSrvs[$server] ) ) {
+                       $now = time();
+                       if ( $now > $this->downSrvs[$server] ) {
+                               // Dead time expired
+                               unset( $this->downSrvs[$server] );
+                       } else {
+                               // Server is dead
+                               return false;
+                       }
+               }
+
+               if ( isset( $this->conns[$server] ) ) {
+                       return $this->conns[$server];
+               }
+
+               try {
+                       $conn = new Redis();
+                       if ( strpos( $server, ':' ) === false ) {
+                               $host = $server;
+                               $port = null;
+                       } else {
+                               list( $host, $port ) = explode( ':', $server );
+                       }
+                       $result = $conn->connect( $host, $port, 5 );
+                       if ( !$result ) {
+                               $this->error( "Could not connect to Redis 
server $host:$port." );
+                               // Mark server down for some time to avoid 
further timeouts
+                               $this->downSrvs[$server] = time() + 30;
+
+                               return false;
+                       }
+                       if ( $this->password !== null ) {
+                               $conn->auth( $this->password );
+                       }
+               } catch ( RedisException $e ) {
+                       $this->downSrvs[$server] = time() + 30;
+
+                       return false;
+               }
+
+               if ( $conn ) {
+                       $conn->setOption( Redis::OPT_READ_TIMEOUT, 5 );
+                       $conn->setOption( Redis::OPT_SERIALIZER, 
Redis::SERIALIZER_NONE );
+                       $this->conns[$server] = $conn;
+
+                       return $conn;
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * @param RedisException $e
+        * @param string $server
+        */
+       public function handleRedisError( RedisException $e, $server ) {
+               unset( $this->conns[$server] );
+               $this->error( "Redis error: " . $e->getMessage() );
+               $this->incrStats( "redis-error." . gethostname(), 1 );
+       }
+
+       /**
+        * @param Redis $conn
+        * @param string $cmd
+        * @param array $args
+        * @return mixed
+        */
+       public function redisCmd( Redis $conn, $cmd, array $args = array() ) {
+               $conn->clearLastError();
+               // we had some job runners oom'ing on this call, log what we are
+               // doing so there is relevant information next to the oom
+               $this->debug( "Redis cmd: $cmd " . json_encode( $args ) );
+               $res = call_user_func_array( array( $conn, $cmd ), $args );
+               if ( $conn->getLastError() ) {
+                       // Make all errors be exceptions instead of "most but 
not all".
+                       // This will let the caller know to reset the 
connection to be safe.
+                       throw new RedisException( $conn->getLastError() );
+               }
+               return $res;
+       }
+
+       /**
+        * @param string $event
+        * @param integer $delta
+        * @return void
+        */
+       public function incrStats( $event, $delta = 1 ) {
+               if ( !$this->statsdHost || $delta == 0 ) {
+                       return; // nothing to do
+               }
+
+               static $format = "%s:%s|m\n";
+               $packet = sprintf( $format, "jobrunner.$event", $delta );
+
+               if ( !function_exists( 'socket_create' ) ) {
+                       $this->debug( 'No "socket_create" method available.' );
+                       return;
+               }
+
+               static $socket = null;
+               if ( !$socket ) {
+                       $socket = socket_create( AF_INET, SOCK_DGRAM, SOL_UDP );
+               }
+
+               socket_sendto(
+                       $socket,
+                       $packet,
+                       strlen( $packet ),
+                       0,
+                       $this->statsdHost,
+                       $this->statsdPort
+               );
+       }
+
+       /**
+        * @param string $s
+        */
+       public function debug( $s ) {
+               if ( $this->verbose ) {
+                       print date( DATE_ISO8601 ) . ": $s\n";
+               }
+       }
+
+       /**
+        * @param string $s
+        */
+       public function notice( $s ) {
+               print date( DATE_ISO8601 ) . ": $s\n";
+       }
+
+       /**
+        * @param string $s
+        */
+       public function error( $s ) {
+               fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
+       }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/192207
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie4308ae6db61e9d48b1af099b37f25ec3cd36e09
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to