Ori.livneh has submitted this change and it was merged.

Change subject: Factored out JobRunnerPipeline and added curl dispatcher support
......................................................................


Factored out JobRunnerPipeline and added curl dispatcher support

* Adds a 'dispatcher' param that lets the runner curl to that base URL to run 
jobs instead of starting runJobs.php

Change-Id: Id73180ab3baea9e5e5013a9ea5cefeba43f057e6
---
M redisJobRunnerService
1 file changed, 350 insertions(+), 298 deletions(-)

Approvals:
  Ori.livneh: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/redisJobRunnerService b/redisJobRunnerService
index cf75580..aee0e46 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -24,12 +24,41 @@
 )->main();
 
 class RedisJobRunnerService {
+       /** @var array List of IP:<port> entries */
+       protected $queueSrvs = array();
+       /** @var array List of IP:<port> entries */
+       protected $aggrSrvs = array();
+       /** @var string Redis password */
+       protected $password;
+       /** @var string IP address or hostname */
+       protected $statsdHost;
+       /** @var integer Port number */
+       protected $statsdPort;
+
+       /** @var bool */
+       protected $verbose;
+       /** @var array Map of (job type => integer seconds) */
+       protected $claimTTLMap = array();
+       /** @var array Map of (job type => integer) */
+       protected $attemptsMap = array();
+
+       /** @var string MediaWiki script wrapper */
+       public $wrapper;
+       /** @var array Map of (id => (job type list,count) */
+       public $loopMap = array();
+       /** @var array Map of (job type => integer) */
+       public $maxRealMap = array();
+       /** @var array Map of (job type => integer) */
+       public $maxMemMap = array();
+       /** @var array String URL of dispatcher */
+       public $dispatcher;
+
        /**
         * How long can low priority jobs be run until some high priority
         * jobs should be checked for and run if they exist.
         * @var integer
         */
-       protected $hpMaxDelay = 120;
+       public $hpMaxDelay = 120;
        /**
         * The maxtime parameter for runJobs.php for low priority jobs.
         * The lower this value is, the less jobs of one wiki can hog attention
@@ -37,13 +66,13 @@
         * This should be lower than hpmaxdelay.
         * @var integer
         */
-       protected $lpMaxTime = 60;
+       public $lpMaxTime = 60;
        /**
         * How long can high priority jobs be run until some low priority
         * jobs should be checked for and run if they exist.
         * @var integer
         */
-       protected $lpMaxDelay = 600;
+       public $lpMaxDelay = 600;
        /**
         * The maxtime parameter for runJobs.php for high priority jobs.
         * The lower this value is, the less jobs of one wiki/type can hog 
attention
@@ -51,33 +80,7 @@
         * This should be lower than lpmaxdelay.
         * @var integer
         */
-       protected $hpMaxTime = 30;
-
-       /** @var array List of IP:<port> entries */
-       protected $queueSrvs = array();
-       /** @var array List of IP:<port> entries */
-       protected $aggrSrvs = array();
-       /** @var string Redis password */
-       protected $password;
-
-       /** @var bool */
-       protected $verbose;
-       /** @var string MediaWiki script wrapper */
-       protected $wrapper;
-       /** @var array Map of (id => (job type list,count) */
-       protected $loopMap = array();
-       /** @var array Map of (job type => integer seconds) */
-       protected $claimTTLMap = array();
-       /** @var array Map of (job type => integer) */
-       protected $attemptsMap = array();
-       /** @var array Map of (job type => integer) */
-       protected $maxRealMap = array();
-       /** @var array Map of (job type => integer) */
-       protected $maxMemMap = array();
-       /** @var string IP address or hostname */
-       protected $statsdHost;
-       /** @var integer Port number */
-       protected $statsdPort;
+       public $hpMaxTime = 30;
 
        /** @var array Map of (server => Redis object) */
        protected $conns = array();
@@ -206,6 +209,9 @@
                                $this->maxMemMap[$type] = $value;
                        }
                }
+               if ( isset( $config['dispatcher'] ) ) {
+                       $this->dispatcher = $config['dispatcher'];
+               }
                if ( isset( $config['statsdAddr'] ) ) {
                        $m = array();
                        if ( preg_match( '/^(.+):(\d+)$/', 
$config['statsdAddr'], $m ) ) {
@@ -226,38 +232,19 @@
 
                $host = gethostname();
                $prioMap = array(); // map of (id => (current priority, since))
-               $procMap = array(); // map of (id => slot # => process handle, 
descriptors)
-               foreach ( $this->loopMap as $id => $info ) {
+               $pipeline = new JobRunnerPipeline( $this );
+               foreach ( $this->loopMap as $loop => $info ) {
                        for ( $i=0; $i < $info['runners']; ++$i ) {
-                               $procMap[$id][$i] = array(
-                                       'handle'  => false,
-                                       'pipes'   => array(),
-                                       'db'      => null,
-                                       'type'    => null,
-                                       'cmd'     => null,
-                                       'stime'   => 0,
-                                       'sigtime' => 0
-                               );
-                               $prioMap[$id] = array( 'high' => true, 'since' 
=> time() );
+                               $pipeline->initSlot( $loop, $i );
+                               $prioMap[$loop] = array( 'high' => true, 
'since' => time() );
                        }
-                       $this->notice( "Initialized loop $id with 
{$info['runners']} runner(s)." );
+                       $this->notice( "Initialized loop $loop with 
{$info['runners']} runner(s)." );
                }
 
                // Setup signal handlers
-               $handlerFunc = function( $signo ) use ( &$procMap ) {
+               $handlerFunc = function( $signo ) use ( $pipeline ) {
                        print "Caught signal ($signo)\n";
-                       foreach ( $procMap as &$procSlots ) {
-                               foreach ( $procSlots as &$procSlot ) {
-                                       if ( !$procSlot['handle'] ) {
-                                               continue;
-                                       }
-                                       fclose( $procSlot['pipes'][1] );
-                                       fclose( $procSlot['pipes'][2] );
-                                       $status = proc_get_status( 
$procSlot['handle'] );
-                                       print "Sending SIGTERM to 
{$status['pid']}.\n";
-                                       proc_terminate( $procSlot['handle'] );
-                               }
-                       }
+                       $pipeline->terminateSlots();
                        exit( 128 + $signo );
                };
                $useSignals = function_exists( 'pcntl_signal' );
@@ -295,47 +282,47 @@
                                continue;
                        }
                        // Spawn new runners as slots become available
-                       foreach ( $procMap as $id => &$procSlots ) {
-                               $this->debug( "Checking runner loop $id..." );
+                       foreach ( $prioMap as $loop => &$loopPriority ) {
+                               $this->debug( "Checking runner loop $loop..." );
                                // Implement high/low priority via time-sharing
-                               if ( $prioMap[$id]['high']
-                                       && ( time() - $prioMap[$id]['since'] ) 
> $this->lpMaxDelay
+                               if ( $loopPriority['high']
+                                       && ( time() - $loopPriority['since'] ) 
> $this->lpMaxDelay
                                ) {
-                                       $prioMap[$id]['high'] = false;
-                                       $prioMap[$id]['since'] = time();
-                                       $this->debug( "Runner loop $id now in 
low priority." );
+                                       $loopPriority['high'] = false;
+                                       $loopPriority['since'] = time();
+                                       $this->debug( "Runner loop $loop now in 
low priority." );
                                        ++$prioSwitches;
-                               } elseif ( !$prioMap[$id]['high']
-                                       && ( time() - $prioMap[$id]['since'] ) 
> $this->hpMaxDelay
+                               } elseif ( !$loopPriority['high']
+                                       && ( time() - $loopPriority['since'] ) 
> $this->hpMaxDelay
                                ) {
-                                       $prioMap[$id]['high'] = true;
-                                       $prioMap[$id]['since'] = time();
-                                       $this->debug( "Runner loop $id now in 
high priority." );
+                                       $loopPriority['high'] = true;
+                                       $loopPriority['since'] = time();
+                                       $this->debug( "Runner loop $loop now in 
high priority." );
                                        ++$prioSwitches;
                                }
                                // Find any free slots and replace them with 
new processes
-                               list( $free, $new ) = $this->refillSlots( $id, 
$procSlots, $prioMap, $pending );
+                               list( $free, $new ) = $pipeline->refillSlots( 
$loop, $prioMap, $pending );
                                $anyFree += $free;
                                $anyNew += $new;
                                // Rotate the priority from high/low and back 
if no jobs were found
                                if ( !$free ) {
-                                       $this->debug( "Runner loop $id is 
full." );
+                                       $this->debug( "Runner loop $loop is 
full." );
                                } elseif ( !$new ) {
-                                       if ( $prioMap[$id]['high'] ) {
-                                               $prioMap[$id]['high'] = false;
-                                               $this->debug( "Runner loop $id 
now in low priority." );
+                                       if ( $loopPriority['high'] ) {
+                                               $loopPriority['high'] = false;
+                                               $this->debug( "Runner loop 
$loop now in low priority." );
                                        } else {
-                                               $prioMap[$id]['high'] = true;
-                                               $this->debug( "Runner loop $id 
now in high priority." );
+                                               $loopPriority['high'] = true;
+                                               $this->debug( "Runner loop 
$loop now in high priority." );
                                        }
-                                       $prioMap[$id]['since'] = time();
-                                       $this->debug( "Runner loop $id has no 
jobs." );
+                                       $loopPriority['since'] = time();
+                                       $this->debug( "Runner loop $loop has no 
jobs." );
                                        ++$prioSwitches;
                                } else {
-                                       $this->debug( "Done checking loop $id." 
);
+                                       $this->debug( "Done checking loop 
$loop." );
                                }
                        }
-                       unset( $procSlots );
+                       unset( $loopPriority );
                        $this->incrStats( "spawn.$host", $anyNew );
                        $this->incrStats( "prioritychange.$host", $prioSwitches 
);
                        // Backoff if there is nothing to do
@@ -354,221 +341,6 @@
                        $this->incrStats( "memory.$host", $memCurrent - 
$memLast );
                        $memLast = $memCurrent;
                }
-       }
-
-       /**
-        * @param integer $id
-        * @param array $procSlots
-        * @param array $prioMap
-        * @param array $pending
-        * @return array
-        */
-       protected function refillSlots( $id, array &$procSlots, array $prioMap, 
array &$pending ) {
-               $free = 0;
-               $new = 0;
-               $cTime = time();
-               foreach ( $procSlots as $slot => &$procSlot ) {
-                       $status = $procSlot['handle']
-                               ? proc_get_status( $procSlot['handle'] )
-                               : false;
-                       if ( $status && $status['running'] ) {
-                               $maxReal = isset( 
$this->maxRealMap[$procSlot['type']] )
-                                       ? $this->maxRealMap[$procSlot['type']]
-                                       : $this->maxRealMap['*'];
-                               $age = $cTime - $procSlot['stime'];
-                               if ( $age >= $maxReal && !$procSlot['sigtime'] 
) {
-                                       $cmd = $procSlot['cmd'];
-                                       $this->error( "Runner loop $id process 
in slot $slot timed out " .
-                                               "[{$age}s; max: 
{$maxReal}s]:\n$cmd" );
-                                       posix_kill( $status['pid'], SIGTERM ); 
// non-blocking
-                                       $procSlot['sigtime'] = time();
-                                       $this->incrStats( 
'runner-status.timeout', 1 );
-                               } elseif ( $age >= $maxReal && ( $cTime - 
$procSlot['sigtime'] ) > 5 ) {
-                                       $this->error( "Runner loop $id process 
in slot $slot sent SIGKILL." );
-                                       $this->closeRunner( $id, $slot, 
$procSlot, SIGKILL );
-                                       $this->incrStats( 'runner-status.kill', 
1 );
-                               } else {
-                                       continue; // slot is busy
-                               }
-                       } elseif ( $status && !$status['running'] ) {
-                               if ( $status['exitcode'] == 0 ) {
-                                       // If this finished early, lay off of 
the queue for a while
-                                       if ( ( $cTime - $procSlot['stime'] ) < 
$this->hpMaxTime/2 ) {
-                                               unset( 
$pending[$procSlot['type']][$procSlot['db']] );
-                                               $this->debug( "Queue 
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
-                                       }
-                               } else {
-                                       // Mention any serious errors that may 
have occured
-                                       $cmd = $procSlot['cmd'];
-                                       $error = stream_get_contents( 
$procSlot['pipes'][2] );
-                                       if ( $error == '' ) {
-                                               $error = stream_get_contents( 
$procSlot['pipes'][1] );
-                                       }
-                                       $this->error( "Runner loop $id process 
in slot $slot " .
-                                               "gave status 
'{$status['exitcode']}':\n$cmd\n\t$error" );
-                                       $this->incrStats( 
'runner-status.error', 1 );
-                               }
-                               $this->closeRunner( $id, $slot, $procSlot );
-                       } elseif ( !$status && $procSlot['handle'] ) {
-                               $this->error( "Runner loop $id process in slot 
$slot gave no status." );
-                               $this->closeRunner( $id, $slot, $procSlot );
-                               $this->incrStats( 'runner-status.none', 1 );
-                       }
-                       ++$free;
-                       $queue = $this->selectQueue( $id, $slot, $prioMap, 
$pending );
-                       if ( !$queue ) {
-                               break;
-                       }
-                       // Spawn a job runner for this loop ID
-                       $highPrio = $prioMap[$id]['high'];
-                       $this->spawnRunner( $id, $slot, $highPrio, $queue, 
$procSlot );
-                       ++$new;
-               }
-
-               return array( $free, $new );
-       }
-
-       /**
-        * @param integer $id
-        * @param integer $slot
-        * @param array $prioMap
-        * @param array $pending
-        * @return array|boolean
-        */
-       protected function selectQueue( $id, $slot, array $prioMap, array 
$pending ) {
-               $include = array();
-               $exclude = array();
-               foreach ( $this->loopMap[$id]['types'] as $type ) {
-                       // "*" means "all current queues"
-                       if ( $type === '*' ) {
-                               $include = array_merge( $include, array_keys( 
$pending ) );
-                       // "-" as a prefix means "exclude this queue" from "*"
-                       } elseif ( $type[0] === '-' ) {
-                               $exclude[] = substr( $type, 1 );
-                       // "~" as a prefix means "low priority"
-                       } elseif ( $type[0] === '~' ) {
-                               if ( $prioMap[$id]['high'] ) {
-                                       // In high priority mode, exclude this 
type
-                                       $exclude[] = substr( $type, 1 );
-                               } else {
-                                       // In low priority mode, allow this type
-                                       $include[] = substr( $type, 1 );
-                               }
-                       } else {
-                               $include[] = $type;
-                       }
-               }
-               $candidateTypes = array_diff( $include, $exclude );
-
-               $candidates = array(); // list of (type, db)
-               // Flatten the tree of candidates into a flat list so that a 
random
-               // item can be selected, weighing each queue (type/db tuple) 
equally.
-               foreach ( $candidateTypes as $type ) {
-                       if ( isset( $pending[$type] ) ) {
-                               foreach ( $pending[$type] as $db => $since ) {
-                                       $candidates[] = array( $type, $db );
-                               }
-                       }
-               }
-
-               if ( !count( $candidates ) ) {
-                       return false; // no jobs for this type
-               }
-
-               return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
-       }
-
-       /**
-        * @param integer $id
-        * @param integer $slot
-        * @param bool $highPrio
-        * @param array $queue
-        * @param array $procSlot
-        * @return bool
-        */
-       protected function spawnRunner( $id, $slot, $highPrio, array $queue, 
array &$procSlot ) {
-               // Pick a random queue
-               list( $type, $db ) = $queue;
-               $encType = escapeshellarg( $type );
-               $encWiki = escapeshellarg( $db );
-               $encMaxTime = escapeshellarg( $highPrio ? $this->lpMaxTime : 
$this->hpMaxTime );
-               $encMaxMem = escapeshellarg(
-                       isset( $this->maxMemMap[$type] ) ? 
$this->maxMemMap[$type] : $this->maxMemMap['*']
-               );
-
-               // Apply any HET-deploy style wrapper
-               $script = $this->wrapper
-                       ? "{$this->wrapper} runJobs.php"
-                       : "php maintenance/runJobs.php";
-
-               // Make sure the runner is launched with various time/memory 
limits.
-               // Nice the process so things like ssh and deployment scripts 
are fine.
-               $cmd = "$script --wiki=$encWiki --type=$encType " .
-                       "--maxtime=$encMaxTime --memory-limit=$encMaxMem";
-               if ( substr( php_uname(), 0, 7 ) !== 'Windows' ) {
-                       $cmd = "nice -n 19 $cmd";
-               }
-
-               $descriptors = array(
-                       0 => array( "pipe", "r" ), // stdin (child)
-                       1 => array( "pipe", "w" ), // stdout (child)
-                       2 => array( "pipe", "w" ) // stderr (child)
-               );
-
-               $this->debug( "Spawning runner in loop $id at slot $slot 
($type, $db):\n\t$cmd." );
-
-               // Start the runner in the background
-               $procSlot['handle'] = proc_open( $cmd, $descriptors, 
$procSlot['pipes'] );
-               fclose( $procSlot['pipes'][0] );
-               unset( $procSlot['pipes'][0] ); // unused
-               $procSlot['db'] = $db;
-               $procSlot['type'] = $type;
-               $procSlot['cmd'] = $cmd;
-               $procSlot['stime'] = time();
-               $procSlot['sigtime'] = 0;
-
-               if ( $procSlot['handle'] ) {
-                       return true;
-               } else {
-                       $this->error( "Could not spawn process in loop $id: 
$cmd" );
-                       $this->incrStats( 'runner-status.error', 1 );
-                       return false;
-               }
-       }
-
-       /**
-        * @param integer $id
-        * @param integer $slot
-        * @param array $procSlot
-        * @param integer $signal
-        */
-       protected function closeRunner( $id, $slot, array &$procSlot, $signal = 
null ) {
-               if ( $procSlot['pipes'] ) {
-                       if ( $procSlot['pipes'][1] !== false ) {
-                               fclose( $procSlot['pipes'][1] );
-                               $procSlot['pipes'][1] = false;
-                       }
-                       if ( $procSlot['pipes'][2] !== false ) {
-                               fclose( $procSlot['pipes'][2] );
-                               $procSlot['pipes'][2] = false;
-                       }
-               }
-               if ( $procSlot['handle'] ) {
-                       $this->debug( "Closing process in loop $id at slot 
$slot" );
-                       if ( $signal !== null ) {
-                               // Tell the process to close with a signal
-                               proc_terminate( $procSlot['handle'], $signal );
-                       } else {
-                               // Wait for the process to finish on its own
-                               proc_close( $procSlot['handle'] );
-                       }
-               }
-               $procSlot['handle'] = false;
-               $procSlot['db'] = null;
-               $procSlot['type'] = null;
-               $procSlot['stime'] = 0;
-               $procSlot['sigtime'] = 0;
-               $procSlot['cmd'] = null;
        }
 
        /**
@@ -927,7 +699,7 @@
         * @param integer $delta
         * @return void
         */
-       protected function incrStats( $event, $delta = 1 ) {
+       public function incrStats( $event, $delta = 1 ) {
                if ( !$this->statsdHost || $delta == 0 ) {
                        return; // nothing to do
                }
@@ -958,7 +730,7 @@
        /**
         * @param string $s
         */
-       protected function debug( $s ) {
+       public function debug( $s ) {
                if ( $this->verbose ) {
                        print "$s\n";
                }
@@ -967,14 +739,294 @@
        /**
         * @param string $s
         */
-       protected function notice( $s ) {
+       public function notice( $s ) {
                print date( DATE_ISO8601 ) . ": $s\n";
        }
 
        /**
         * @param string $s
         */
-       protected function error( $s ) {
+       public function error( $s ) {
                fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
        }
 }
+
+class JobRunnerPipeline {
+       /** @var RedisJobRunnerService */
+       protected $srvc;
+       /** @var array (loop ID => slot ID => slot status array) */
+       protected $procMap = array();
+
+       /**
+        * @param JobRunnerService $service
+        */
+       public function __construct( RedisJobRunnerService $service ) {
+               $this->srvc = $service;
+       }
+
+       /**
+        * @param string $loop
+        * @param int $slot
+        */
+       public function initSlot( $loop, $slot ) {
+               $this->procMap[$loop][$slot] = array(
+                       'handle'  => false,
+                       'pipes'   => array(),
+                       'db'      => null,
+                       'type'    => null,
+                       'cmd'     => null,
+                       'stime'   => 0,
+                       'sigtime' => 0,
+                       'isCurl'  => false
+               );
+       }
+
+       /**
+        * @param integer $loop
+        * @param array $prioMap
+        * @param array $pending
+        * @return array
+        */
+       public function refillSlots( $loop, array $prioMap, array &$pending ) {
+               $free = 0;
+               $new = 0;
+               $cTime = time();
+               foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
+                       $status = $procSlot['handle']
+                               ? proc_get_status( $procSlot['handle'] )
+                               : false;
+                       if ( $status && $status['running'] ) {
+                               $maxReal = isset( 
$this->srvc->maxRealMap[$procSlot['type']] )
+                                       ? 
$this->srvc->maxRealMap[$procSlot['type']]
+                                       : $this->srvc->maxRealMap['*'];
+                               $age = $cTime - $procSlot['stime'];
+                               if ( $age >= $maxReal && !$procSlot['sigtime'] 
) {
+                                       $cmd = $procSlot['cmd'];
+                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot timed out " .
+                                               "[{$age}s; max: 
{$maxReal}s]:\n$cmd" );
+                                       posix_kill( $status['pid'], SIGTERM ); 
// non-blocking
+                                       $procSlot['sigtime'] = time();
+                                       $this->srvc->incrStats( 
'runner-status.timeout', 1 );
+                               } elseif ( $age >= $maxReal && ( $cTime - 
$procSlot['sigtime'] ) > 5 ) {
+                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot sent SIGKILL." );
+                                       $this->closeRunner( $loop, $slot, 
$procSlot, SIGKILL );
+                                       $this->srvc->incrStats( 
'runner-status.kill', 1 );
+                               } else {
+                                       continue; // slot is busy
+                               }
+                       } elseif ( $status && !$status['running'] ) {
+                               $error = null;
+                               if ( $procSlot['isCurl'] && $status['exitcode'] 
== 0 ) {
+                                       $error = stream_get_contents( 
$procSlot['pipes'][1] );
+                                       if ( !json_decode( $error ) ) {
+                                               $status['exitcode'] = 1; // 
probably PHP exception
+                                       }
+                               }
+                               if ( $status['exitcode'] == 0 ) {
+                                       // If this finished early, lay off of 
the queue for a while
+                                       if ( ( $cTime - $procSlot['stime'] ) < 
$this->hpMaxTime/2 ) {
+                                               unset( 
$pending[$procSlot['type']][$procSlot['db']] );
+                                               $this->srvc->debug( "Queue 
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
+                                       }
+                               } else {
+                                       // Mention any serious errors that may 
have occured
+                                       $cmd = $procSlot['cmd'];
+                                       if ( $error === null ) {
+                                               $error = stream_get_contents( 
$procSlot['pipes'][2] )
+                                                       ?: stream_get_contents( 
$procSlot['pipes'][1] );
+                                       }
+                                       $this->srvc->error( "Runner loop $loop 
process in slot $slot " .
+                                               "gave status 
'{$status['exitcode']}':\n$cmd\n\t$error" );
+                                       $this->srvc->incrStats( 
'runner-status.error', 1 );
+                               }
+                               $this->closeRunner( $loop, $slot, $procSlot );
+                       } elseif ( !$status && $procSlot['handle'] ) {
+                               $this->srvc->error( "Runner loop $loop process 
in slot $slot gave no status." );
+                               $this->closeRunner( $loop, $slot, $procSlot );
+                               $this->srvc->incrStats( 'runner-status.none', 1 
);
+                       }
+                       ++$free;
+                       $queue = $this->selectQueue( $loop, $prioMap, $pending 
);
+                       if ( !$queue ) {
+                               break;
+                       }
+                       // Spawn a job runner for this loop ID
+                       $highPrio = $prioMap[$loop]['high'];
+                       $this->spawnRunner( $loop, $slot, $highPrio, $queue, 
$procSlot );
+                       ++$new;
+               }
+               unset( $procSlot );
+
+               return array( $free, $new );
+       }
+
+       /**
+        * @param integer $loop
+        * @param array $prioMap
+        * @param array $pending
+        * @return array|boolean
+        */
+       protected function selectQueue( $loop, array $prioMap, array $pending ) 
{
+               $include = array();
+               $exclude = array();
+               foreach ( $this->srvc->loopMap[$loop]['types'] as $type ) {
+                       // "*" means "all current queues"
+                       if ( $type === '*' ) {
+                               $include = array_merge( $include, array_keys( 
$pending ) );
+                       // "-" as a prefix means "exclude this queue" from "*"
+                       } elseif ( $type[0] === '-' ) {
+                               $exclude[] = substr( $type, 1 );
+                       // "~" as a prefix means "low priority"
+                       } elseif ( $type[0] === '~' ) {
+                               if ( $prioMap[$loop]['high'] ) {
+                                       // In high priority mode, exclude this 
type
+                                       $exclude[] = substr( $type, 1 );
+                               } else {
+                                       // In low priority mode, allow this type
+                                       $include[] = substr( $type, 1 );
+                               }
+                       } else {
+                               $include[] = $type;
+                       }
+               }
+               $candidateTypes = array_diff( $include, $exclude );
+
+               $candidates = array(); // list of (type, db)
+               // Flatten the tree of candidates into a flat list so that a 
random
+               // item can be selected, weighing each queue (type/db tuple) 
equally.
+               foreach ( $candidateTypes as $type ) {
+                       if ( isset( $pending[$type] ) ) {
+                               foreach ( $pending[$type] as $db => $since ) {
+                                       $candidates[] = array( $type, $db );
+                               }
+                       }
+               }
+
+               if ( !count( $candidates ) ) {
+                       return false; // no jobs for this type
+               }
+
+               return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
+       }
+
+       /**
+        * @param integer $loop
+        * @param integer $slot
+        * @param bool $highPrio
+        * @param array $queue
+        * @param array $procSlot
+        * @return bool
+        */
+       protected function spawnRunner( $loop, $slot, $highPrio, array $queue, 
array &$procSlot ) {
+               // Pick a random queue
+               list( $type, $db ) = $queue;
+               $maxTime = $highPrio ? $this->srvc->lpMaxTime : 
$this->srvc->hpMaxTime;
+               $maxMem = isset( $this->srvc->maxMemMap[$type] )
+                       ? $this->srvc->maxMemMap[$type]
+                       : $this->srvc->maxMemMap['*'];
+
+               $isCurl = (bool)$this->srvc->dispatcher;
+               if ( $isCurl ) {
+                       $url = $this->srvc->dispatcher .
+                               "?wiki=" . rawurlencode( $db ) .
+                               "&type=" . rawurlencode( $type ) .
+                               "&maxtime=" . rawurlencode( $maxTime );
+                       $cmd = "curl -s -XPOST -a " . escapeshellarg( $url );
+               } else {
+                       // Apply any HET-deploy style wrapper
+                       $script = $this->srvc->wrapper
+                               ? "{$this->srvc->wrapper} runJobs.php"
+                               : "php maintenance/runJobs.php";
+
+                       // Make sure the runner is launched with various 
time/memory limits.
+                       // Nice the process so things like ssh and deployment 
scripts are fine.
+                       $cmd = "$script --wiki=" . escapeshellarg( $db ) .
+                               " --type=" . escapeshellarg( $type ) .
+                               " --maxtime=" . escapeshellarg( $maxTime ) .
+                               " --memory-limit=" . escapeshellarg( $maxMem );
+                       if ( substr( php_uname(), 0, 7 ) !== 'Windows' ) {
+                               $cmd = "nice -n 19 $cmd";
+                       }
+               }
+
+               $descriptors = array(
+                       0 => array( "pipe", "r" ), // stdin (child)
+                       1 => array( "pipe", "w" ), // stdout (child)
+                       2 => array( "pipe", "w" ) // stderr (child)
+               );
+
+               $this->srvc->debug( "Spawning runner in loop $loop at slot 
$slot ($type, $db):\n\t$cmd." );
+
+               // Start the runner in the background
+               $procSlot['handle'] = proc_open( $cmd, $descriptors, 
$procSlot['pipes'] );
+               fclose( $procSlot['pipes'][0] );
+               unset( $procSlot['pipes'][0] ); // unused
+               $procSlot['db'] = $db;
+               $procSlot['type'] = $type;
+               $procSlot['cmd'] = $cmd;
+               $procSlot['stime'] = time();
+               $procSlot['sigtime'] = 0;
+               $procSlot['isCurl'] = true;
+
+               if ( $procSlot['handle'] ) {
+                       return true;
+               } else {
+                       $this->srvc->error( "Could not spawn process in loop 
$loop: $cmd" );
+                       $this->srvc->incrStats( 'runner-status.error', 1 );
+                       return false;
+               }
+       }
+
+       /**
+        * @param integer $loop
+        * @param integer $slot
+        * @param array $procSlot
+        * @param integer $signal
+        */
+       protected function closeRunner( $loop, $slot, array &$procSlot, $signal 
= null ) {
+               if ( $procSlot['pipes'] ) {
+                       if ( $procSlot['pipes'][1] !== false ) {
+                               fclose( $procSlot['pipes'][1] );
+                               $procSlot['pipes'][1] = false;
+                       }
+                       if ( $procSlot['pipes'][2] !== false ) {
+                               fclose( $procSlot['pipes'][2] );
+                               $procSlot['pipes'][2] = false;
+                       }
+               }
+               if ( $procSlot['handle'] ) {
+                       $this->srvc->debug( "Closing process in loop $loop at 
slot $slot." );
+                       if ( $signal !== null ) {
+                               // Tell the process to close with a signal
+                               proc_terminate( $procSlot['handle'], $signal );
+                       } else {
+                               // Wait for the process to finish on its own
+                               proc_close( $procSlot['handle'] );
+                       }
+               }
+               $procSlot['handle'] = false;
+               $procSlot['db'] = null;
+               $procSlot['type'] = null;
+               $procSlot['stime'] = 0;
+               $procSlot['sigtime'] = 0;
+               $procSlot['cmd'] = null;
+               $procSlot['isCurl'] = false;
+       }
+
+       public function terminateSlots() {
+               foreach ( $this->procMap as &$procSlots ) {
+                       foreach ( $procSlots as &$procSlot ) {
+                               if ( !$procSlot['handle'] ) {
+                                       continue;
+                               }
+                               fclose( $procSlot['pipes'][1] );
+                               fclose( $procSlot['pipes'][2] );
+                               $status = proc_get_status( $procSlot['handle'] 
);
+                               print "Sending SIGTERM to {$status['pid']}.\n";
+                               proc_terminate( $procSlot['handle'] );
+                       }
+                       unset( $procSlot );
+               }
+               unset( $procSlots );
+       }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/149216
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id73180ab3baea9e5e5013a9ea5cefeba43f057e6
Gerrit-PatchSet: 2
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to