Ori.livneh has submitted this change and it was merged.
Change subject: Factored out JobRunnerPipeline and added curl dispatcher support
......................................................................
Factored out JobRunnerPipeline and added curl dispatcher support
* Adds a 'dispatcher' param that lets the runner curl to that base URL to run
jobs instead of starting runJobs.php
Change-Id: Id73180ab3baea9e5e5013a9ea5cefeba43f057e6
---
M redisJobRunnerService
1 file changed, 350 insertions(+), 298 deletions(-)
Approvals:
Ori.livneh: Looks good to me, approved
jenkins-bot: Verified
diff --git a/redisJobRunnerService b/redisJobRunnerService
index cf75580..aee0e46 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -24,12 +24,41 @@
)->main();
class RedisJobRunnerService {
+ /** @var array List of IP:<port> entries */
+ protected $queueSrvs = array();
+ /** @var array List of IP:<port> entries */
+ protected $aggrSrvs = array();
+ /** @var string Redis password */
+ protected $password;
+ /** @var string IP address or hostname */
+ protected $statsdHost;
+ /** @var integer Port number */
+ protected $statsdPort;
+
+ /** @var bool */
+ protected $verbose;
+ /** @var array Map of (job type => integer seconds) */
+ protected $claimTTLMap = array();
+ /** @var array Map of (job type => integer) */
+ protected $attemptsMap = array();
+
+ /** @var string MediaWiki script wrapper */
+ public $wrapper;
+ /** @var array Map of (id => (job type list,count) */
+ public $loopMap = array();
+ /** @var array Map of (job type => integer) */
+ public $maxRealMap = array();
+ /** @var array Map of (job type => integer) */
+ public $maxMemMap = array();
+ /** @var array String URL of dispatcher */
+ public $dispatcher;
+
/**
* How long can low priority jobs be run until some high priority
* jobs should be checked for and run if they exist.
* @var integer
*/
- protected $hpMaxDelay = 120;
+ public $hpMaxDelay = 120;
/**
* The maxtime parameter for runJobs.php for low priority jobs.
* The lower this value is, the less jobs of one wiki can hog attention
@@ -37,13 +66,13 @@
* This should be lower than hpmaxdelay.
* @var integer
*/
- protected $lpMaxTime = 60;
+ public $lpMaxTime = 60;
/**
* How long can high priority jobs be run until some low priority
* jobs should be checked for and run if they exist.
* @var integer
*/
- protected $lpMaxDelay = 600;
+ public $lpMaxDelay = 600;
/**
* The maxtime parameter for runJobs.php for high priority jobs.
* The lower this value is, the less jobs of one wiki/type can hog
attention
@@ -51,33 +80,7 @@
* This should be lower than lpmaxdelay.
* @var integer
*/
- protected $hpMaxTime = 30;
-
- /** @var array List of IP:<port> entries */
- protected $queueSrvs = array();
- /** @var array List of IP:<port> entries */
- protected $aggrSrvs = array();
- /** @var string Redis password */
- protected $password;
-
- /** @var bool */
- protected $verbose;
- /** @var string MediaWiki script wrapper */
- protected $wrapper;
- /** @var array Map of (id => (job type list,count) */
- protected $loopMap = array();
- /** @var array Map of (job type => integer seconds) */
- protected $claimTTLMap = array();
- /** @var array Map of (job type => integer) */
- protected $attemptsMap = array();
- /** @var array Map of (job type => integer) */
- protected $maxRealMap = array();
- /** @var array Map of (job type => integer) */
- protected $maxMemMap = array();
- /** @var string IP address or hostname */
- protected $statsdHost;
- /** @var integer Port number */
- protected $statsdPort;
+ public $hpMaxTime = 30;
/** @var array Map of (server => Redis object) */
protected $conns = array();
@@ -206,6 +209,9 @@
$this->maxMemMap[$type] = $value;
}
}
+ if ( isset( $config['dispatcher'] ) ) {
+ $this->dispatcher = $config['dispatcher'];
+ }
if ( isset( $config['statsdAddr'] ) ) {
$m = array();
if ( preg_match( '/^(.+):(\d+)$/',
$config['statsdAddr'], $m ) ) {
@@ -226,38 +232,19 @@
$host = gethostname();
$prioMap = array(); // map of (id => (current priority, since))
- $procMap = array(); // map of (id => slot # => process handle,
descriptors)
- foreach ( $this->loopMap as $id => $info ) {
+ $pipeline = new JobRunnerPipeline( $this );
+ foreach ( $this->loopMap as $loop => $info ) {
for ( $i=0; $i < $info['runners']; ++$i ) {
- $procMap[$id][$i] = array(
- 'handle' => false,
- 'pipes' => array(),
- 'db' => null,
- 'type' => null,
- 'cmd' => null,
- 'stime' => 0,
- 'sigtime' => 0
- );
- $prioMap[$id] = array( 'high' => true, 'since'
=> time() );
+ $pipeline->initSlot( $loop, $i );
+ $prioMap[$loop] = array( 'high' => true,
'since' => time() );
}
- $this->notice( "Initialized loop $id with
{$info['runners']} runner(s)." );
+ $this->notice( "Initialized loop $loop with
{$info['runners']} runner(s)." );
}
// Setup signal handlers
- $handlerFunc = function( $signo ) use ( &$procMap ) {
+ $handlerFunc = function( $signo ) use ( $pipeline ) {
print "Caught signal ($signo)\n";
- foreach ( $procMap as &$procSlots ) {
- foreach ( $procSlots as &$procSlot ) {
- if ( !$procSlot['handle'] ) {
- continue;
- }
- fclose( $procSlot['pipes'][1] );
- fclose( $procSlot['pipes'][2] );
- $status = proc_get_status(
$procSlot['handle'] );
- print "Sending SIGTERM to
{$status['pid']}.\n";
- proc_terminate( $procSlot['handle'] );
- }
- }
+ $pipeline->terminateSlots();
exit( 128 + $signo );
};
$useSignals = function_exists( 'pcntl_signal' );
@@ -295,47 +282,47 @@
continue;
}
// Spawn new runners as slots become available
- foreach ( $procMap as $id => &$procSlots ) {
- $this->debug( "Checking runner loop $id..." );
+ foreach ( $prioMap as $loop => &$loopPriority ) {
+ $this->debug( "Checking runner loop $loop..." );
// Implement high/low priority via time-sharing
- if ( $prioMap[$id]['high']
- && ( time() - $prioMap[$id]['since'] )
> $this->lpMaxDelay
+ if ( $loopPriority['high']
+ && ( time() - $loopPriority['since'] )
> $this->lpMaxDelay
) {
- $prioMap[$id]['high'] = false;
- $prioMap[$id]['since'] = time();
- $this->debug( "Runner loop $id now in
low priority." );
+ $loopPriority['high'] = false;
+ $loopPriority['since'] = time();
+ $this->debug( "Runner loop $loop now in
low priority." );
++$prioSwitches;
- } elseif ( !$prioMap[$id]['high']
- && ( time() - $prioMap[$id]['since'] )
> $this->hpMaxDelay
+ } elseif ( !$loopPriority['high']
+ && ( time() - $loopPriority['since'] )
> $this->hpMaxDelay
) {
- $prioMap[$id]['high'] = true;
- $prioMap[$id]['since'] = time();
- $this->debug( "Runner loop $id now in
high priority." );
+ $loopPriority['high'] = true;
+ $loopPriority['since'] = time();
+ $this->debug( "Runner loop $loop now in
high priority." );
++$prioSwitches;
}
// Find any free slots and replace them with
new processes
- list( $free, $new ) = $this->refillSlots( $id,
$procSlots, $prioMap, $pending );
+ list( $free, $new ) = $pipeline->refillSlots(
$loop, $prioMap, $pending );
$anyFree += $free;
$anyNew += $new;
// Rotate the priority from high/low and back
if no jobs were found
if ( !$free ) {
- $this->debug( "Runner loop $id is
full." );
+ $this->debug( "Runner loop $loop is
full." );
} elseif ( !$new ) {
- if ( $prioMap[$id]['high'] ) {
- $prioMap[$id]['high'] = false;
- $this->debug( "Runner loop $id
now in low priority." );
+ if ( $loopPriority['high'] ) {
+ $loopPriority['high'] = false;
+ $this->debug( "Runner loop
$loop now in low priority." );
} else {
- $prioMap[$id]['high'] = true;
- $this->debug( "Runner loop $id
now in high priority." );
+ $loopPriority['high'] = true;
+ $this->debug( "Runner loop
$loop now in high priority." );
}
- $prioMap[$id]['since'] = time();
- $this->debug( "Runner loop $id has no
jobs." );
+ $loopPriority['since'] = time();
+ $this->debug( "Runner loop $loop has no
jobs." );
++$prioSwitches;
} else {
- $this->debug( "Done checking loop $id."
);
+ $this->debug( "Done checking loop
$loop." );
}
}
- unset( $procSlots );
+ unset( $loopPriority );
$this->incrStats( "spawn.$host", $anyNew );
$this->incrStats( "prioritychange.$host", $prioSwitches
);
// Backoff if there is nothing to do
@@ -354,221 +341,6 @@
$this->incrStats( "memory.$host", $memCurrent -
$memLast );
$memLast = $memCurrent;
}
- }
-
- /**
- * @param integer $id
- * @param array $procSlots
- * @param array $prioMap
- * @param array $pending
- * @return array
- */
- protected function refillSlots( $id, array &$procSlots, array $prioMap,
array &$pending ) {
- $free = 0;
- $new = 0;
- $cTime = time();
- foreach ( $procSlots as $slot => &$procSlot ) {
- $status = $procSlot['handle']
- ? proc_get_status( $procSlot['handle'] )
- : false;
- if ( $status && $status['running'] ) {
- $maxReal = isset(
$this->maxRealMap[$procSlot['type']] )
- ? $this->maxRealMap[$procSlot['type']]
- : $this->maxRealMap['*'];
- $age = $cTime - $procSlot['stime'];
- if ( $age >= $maxReal && !$procSlot['sigtime']
) {
- $cmd = $procSlot['cmd'];
- $this->error( "Runner loop $id process
in slot $slot timed out " .
- "[{$age}s; max:
{$maxReal}s]:\n$cmd" );
- posix_kill( $status['pid'], SIGTERM );
// non-blocking
- $procSlot['sigtime'] = time();
- $this->incrStats(
'runner-status.timeout', 1 );
- } elseif ( $age >= $maxReal && ( $cTime -
$procSlot['sigtime'] ) > 5 ) {
- $this->error( "Runner loop $id process
in slot $slot sent SIGKILL." );
- $this->closeRunner( $id, $slot,
$procSlot, SIGKILL );
- $this->incrStats( 'runner-status.kill',
1 );
- } else {
- continue; // slot is busy
- }
- } elseif ( $status && !$status['running'] ) {
- if ( $status['exitcode'] == 0 ) {
- // If this finished early, lay off of
the queue for a while
- if ( ( $cTime - $procSlot['stime'] ) <
$this->hpMaxTime/2 ) {
- unset(
$pending[$procSlot['type']][$procSlot['db']] );
- $this->debug( "Queue
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
- }
- } else {
- // Mention any serious errors that may
have occured
- $cmd = $procSlot['cmd'];
- $error = stream_get_contents(
$procSlot['pipes'][2] );
- if ( $error == '' ) {
- $error = stream_get_contents(
$procSlot['pipes'][1] );
- }
- $this->error( "Runner loop $id process
in slot $slot " .
- "gave status
'{$status['exitcode']}':\n$cmd\n\t$error" );
- $this->incrStats(
'runner-status.error', 1 );
- }
- $this->closeRunner( $id, $slot, $procSlot );
- } elseif ( !$status && $procSlot['handle'] ) {
- $this->error( "Runner loop $id process in slot
$slot gave no status." );
- $this->closeRunner( $id, $slot, $procSlot );
- $this->incrStats( 'runner-status.none', 1 );
- }
- ++$free;
- $queue = $this->selectQueue( $id, $slot, $prioMap,
$pending );
- if ( !$queue ) {
- break;
- }
- // Spawn a job runner for this loop ID
- $highPrio = $prioMap[$id]['high'];
- $this->spawnRunner( $id, $slot, $highPrio, $queue,
$procSlot );
- ++$new;
- }
-
- return array( $free, $new );
- }
-
- /**
- * @param integer $id
- * @param integer $slot
- * @param array $prioMap
- * @param array $pending
- * @return array|boolean
- */
- protected function selectQueue( $id, $slot, array $prioMap, array
$pending ) {
- $include = array();
- $exclude = array();
- foreach ( $this->loopMap[$id]['types'] as $type ) {
- // "*" means "all current queues"
- if ( $type === '*' ) {
- $include = array_merge( $include, array_keys(
$pending ) );
- // "-" as a prefix means "exclude this queue" from "*"
- } elseif ( $type[0] === '-' ) {
- $exclude[] = substr( $type, 1 );
- // "~" as a prefix means "low priority"
- } elseif ( $type[0] === '~' ) {
- if ( $prioMap[$id]['high'] ) {
- // In high priority mode, exclude this
type
- $exclude[] = substr( $type, 1 );
- } else {
- // In low priority mode, allow this type
- $include[] = substr( $type, 1 );
- }
- } else {
- $include[] = $type;
- }
- }
- $candidateTypes = array_diff( $include, $exclude );
-
- $candidates = array(); // list of (type, db)
- // Flatten the tree of candidates into a flat list so that a
random
- // item can be selected, weighing each queue (type/db tuple)
equally.
- foreach ( $candidateTypes as $type ) {
- if ( isset( $pending[$type] ) ) {
- foreach ( $pending[$type] as $db => $since ) {
- $candidates[] = array( $type, $db );
- }
- }
- }
-
- if ( !count( $candidates ) ) {
- return false; // no jobs for this type
- }
-
- return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
- }
-
- /**
- * @param integer $id
- * @param integer $slot
- * @param bool $highPrio
- * @param array $queue
- * @param array $procSlot
- * @return bool
- */
- protected function spawnRunner( $id, $slot, $highPrio, array $queue,
array &$procSlot ) {
- // Pick a random queue
- list( $type, $db ) = $queue;
- $encType = escapeshellarg( $type );
- $encWiki = escapeshellarg( $db );
- $encMaxTime = escapeshellarg( $highPrio ? $this->lpMaxTime :
$this->hpMaxTime );
- $encMaxMem = escapeshellarg(
- isset( $this->maxMemMap[$type] ) ?
$this->maxMemMap[$type] : $this->maxMemMap['*']
- );
-
- // Apply any HET-deploy style wrapper
- $script = $this->wrapper
- ? "{$this->wrapper} runJobs.php"
- : "php maintenance/runJobs.php";
-
- // Make sure the runner is launched with various time/memory
limits.
- // Nice the process so things like ssh and deployment scripts
are fine.
- $cmd = "$script --wiki=$encWiki --type=$encType " .
- "--maxtime=$encMaxTime --memory-limit=$encMaxMem";
- if ( substr( php_uname(), 0, 7 ) !== 'Windows' ) {
- $cmd = "nice -n 19 $cmd";
- }
-
- $descriptors = array(
- 0 => array( "pipe", "r" ), // stdin (child)
- 1 => array( "pipe", "w" ), // stdout (child)
- 2 => array( "pipe", "w" ) // stderr (child)
- );
-
- $this->debug( "Spawning runner in loop $id at slot $slot
($type, $db):\n\t$cmd." );
-
- // Start the runner in the background
- $procSlot['handle'] = proc_open( $cmd, $descriptors,
$procSlot['pipes'] );
- fclose( $procSlot['pipes'][0] );
- unset( $procSlot['pipes'][0] ); // unused
- $procSlot['db'] = $db;
- $procSlot['type'] = $type;
- $procSlot['cmd'] = $cmd;
- $procSlot['stime'] = time();
- $procSlot['sigtime'] = 0;
-
- if ( $procSlot['handle'] ) {
- return true;
- } else {
- $this->error( "Could not spawn process in loop $id:
$cmd" );
- $this->incrStats( 'runner-status.error', 1 );
- return false;
- }
- }
-
- /**
- * @param integer $id
- * @param integer $slot
- * @param array $procSlot
- * @param integer $signal
- */
- protected function closeRunner( $id, $slot, array &$procSlot, $signal =
null ) {
- if ( $procSlot['pipes'] ) {
- if ( $procSlot['pipes'][1] !== false ) {
- fclose( $procSlot['pipes'][1] );
- $procSlot['pipes'][1] = false;
- }
- if ( $procSlot['pipes'][2] !== false ) {
- fclose( $procSlot['pipes'][2] );
- $procSlot['pipes'][2] = false;
- }
- }
- if ( $procSlot['handle'] ) {
- $this->debug( "Closing process in loop $id at slot
$slot" );
- if ( $signal !== null ) {
- // Tell the process to close with a signal
- proc_terminate( $procSlot['handle'], $signal );
- } else {
- // Wait for the process to finish on its own
- proc_close( $procSlot['handle'] );
- }
- }
- $procSlot['handle'] = false;
- $procSlot['db'] = null;
- $procSlot['type'] = null;
- $procSlot['stime'] = 0;
- $procSlot['sigtime'] = 0;
- $procSlot['cmd'] = null;
}
/**
@@ -927,7 +699,7 @@
* @param integer $delta
* @return void
*/
- protected function incrStats( $event, $delta = 1 ) {
+ public function incrStats( $event, $delta = 1 ) {
if ( !$this->statsdHost || $delta == 0 ) {
return; // nothing to do
}
@@ -958,7 +730,7 @@
/**
* @param string $s
*/
- protected function debug( $s ) {
+ public function debug( $s ) {
if ( $this->verbose ) {
print "$s\n";
}
@@ -967,14 +739,294 @@
/**
* @param string $s
*/
- protected function notice( $s ) {
+ public function notice( $s ) {
print date( DATE_ISO8601 ) . ": $s\n";
}
/**
* @param string $s
*/
- protected function error( $s ) {
+ public function error( $s ) {
fwrite( STDERR, date( DATE_ISO8601 ) . ": $s\n" );
}
}
+
+class JobRunnerPipeline {
+ /** @var RedisJobRunnerService */
+ protected $srvc;
+ /** @var array (loop ID => slot ID => slot status array) */
+ protected $procMap = array();
+
+ /**
+ * @param JobRunnerService $service
+ */
+ public function __construct( RedisJobRunnerService $service ) {
+ $this->srvc = $service;
+ }
+
+ /**
+ * @param string $loop
+ * @param int $slot
+ */
+ public function initSlot( $loop, $slot ) {
+ $this->procMap[$loop][$slot] = array(
+ 'handle' => false,
+ 'pipes' => array(),
+ 'db' => null,
+ 'type' => null,
+ 'cmd' => null,
+ 'stime' => 0,
+ 'sigtime' => 0,
+ 'isCurl' => false
+ );
+ }
+
+ /**
+ * @param integer $loop
+ * @param array $prioMap
+ * @param array $pending
+ * @return array
+ */
+ public function refillSlots( $loop, array $prioMap, array &$pending ) {
+ $free = 0;
+ $new = 0;
+ $cTime = time();
+ foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
+ $status = $procSlot['handle']
+ ? proc_get_status( $procSlot['handle'] )
+ : false;
+ if ( $status && $status['running'] ) {
+ $maxReal = isset(
$this->srvc->maxRealMap[$procSlot['type']] )
+ ?
$this->srvc->maxRealMap[$procSlot['type']]
+ : $this->srvc->maxRealMap['*'];
+ $age = $cTime - $procSlot['stime'];
+ if ( $age >= $maxReal && !$procSlot['sigtime']
) {
+ $cmd = $procSlot['cmd'];
+ $this->srvc->error( "Runner loop $loop
process in slot $slot timed out " .
+ "[{$age}s; max:
{$maxReal}s]:\n$cmd" );
+ posix_kill( $status['pid'], SIGTERM );
// non-blocking
+ $procSlot['sigtime'] = time();
+ $this->srvc->incrStats(
'runner-status.timeout', 1 );
+ } elseif ( $age >= $maxReal && ( $cTime -
$procSlot['sigtime'] ) > 5 ) {
+ $this->srvc->error( "Runner loop $loop
process in slot $slot sent SIGKILL." );
+ $this->closeRunner( $loop, $slot,
$procSlot, SIGKILL );
+ $this->srvc->incrStats(
'runner-status.kill', 1 );
+ } else {
+ continue; // slot is busy
+ }
+ } elseif ( $status && !$status['running'] ) {
+ $error = null;
+ if ( $procSlot['isCurl'] && $status['exitcode']
== 0 ) {
+ $error = stream_get_contents(
$procSlot['pipes'][1] );
+ if ( !json_decode( $error ) ) {
+ $status['exitcode'] = 1; //
probably PHP exception
+ }
+ }
+ if ( $status['exitcode'] == 0 ) {
+ // If this finished early, lay off of
the queue for a while
+ if ( ( $cTime - $procSlot['stime'] ) <
$this->hpMaxTime/2 ) {
+ unset(
$pending[$procSlot['type']][$procSlot['db']] );
+ $this->srvc->debug( "Queue
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
+ }
+ } else {
+ // Mention any serious errors that may
have occured
+ $cmd = $procSlot['cmd'];
+ if ( $error === null ) {
+ $error = stream_get_contents(
$procSlot['pipes'][2] )
+ ?: stream_get_contents(
$procSlot['pipes'][1] );
+ }
+ $this->srvc->error( "Runner loop $loop
process in slot $slot " .
+ "gave status
'{$status['exitcode']}':\n$cmd\n\t$error" );
+ $this->srvc->incrStats(
'runner-status.error', 1 );
+ }
+ $this->closeRunner( $loop, $slot, $procSlot );
+ } elseif ( !$status && $procSlot['handle'] ) {
+ $this->srvc->error( "Runner loop $loop process
in slot $slot gave no status." );
+ $this->closeRunner( $loop, $slot, $procSlot );
+ $this->srvc->incrStats( 'runner-status.none', 1
);
+ }
+ ++$free;
+ $queue = $this->selectQueue( $loop, $prioMap, $pending
);
+ if ( !$queue ) {
+ break;
+ }
+ // Spawn a job runner for this loop ID
+ $highPrio = $prioMap[$loop]['high'];
+ $this->spawnRunner( $loop, $slot, $highPrio, $queue,
$procSlot );
+ ++$new;
+ }
+ unset( $procSlot );
+
+ return array( $free, $new );
+ }
+
+ /**
+ * @param integer $loop
+ * @param array $prioMap
+ * @param array $pending
+ * @return array|boolean
+ */
+ protected function selectQueue( $loop, array $prioMap, array $pending )
{
+ $include = array();
+ $exclude = array();
+ foreach ( $this->srvc->loopMap[$loop]['types'] as $type ) {
+ // "*" means "all current queues"
+ if ( $type === '*' ) {
+ $include = array_merge( $include, array_keys(
$pending ) );
+ // "-" as a prefix means "exclude this queue" from "*"
+ } elseif ( $type[0] === '-' ) {
+ $exclude[] = substr( $type, 1 );
+ // "~" as a prefix means "low priority"
+ } elseif ( $type[0] === '~' ) {
+ if ( $prioMap[$loop]['high'] ) {
+ // In high priority mode, exclude this
type
+ $exclude[] = substr( $type, 1 );
+ } else {
+ // In low priority mode, allow this type
+ $include[] = substr( $type, 1 );
+ }
+ } else {
+ $include[] = $type;
+ }
+ }
+ $candidateTypes = array_diff( $include, $exclude );
+
+ $candidates = array(); // list of (type, db)
+ // Flatten the tree of candidates into a flat list so that a
random
+ // item can be selected, weighing each queue (type/db tuple)
equally.
+ foreach ( $candidateTypes as $type ) {
+ if ( isset( $pending[$type] ) ) {
+ foreach ( $pending[$type] as $db => $since ) {
+ $candidates[] = array( $type, $db );
+ }
+ }
+ }
+
+ if ( !count( $candidates ) ) {
+ return false; // no jobs for this type
+ }
+
+ return $candidates[mt_rand( 0, count( $candidates ) - 1 )];
+ }
+
+ /**
+ * @param integer $loop
+ * @param integer $slot
+ * @param bool $highPrio
+ * @param array $queue
+ * @param array $procSlot
+ * @return bool
+ */
+ protected function spawnRunner( $loop, $slot, $highPrio, array $queue,
array &$procSlot ) {
+ // Pick a random queue
+ list( $type, $db ) = $queue;
+ $maxTime = $highPrio ? $this->srvc->lpMaxTime :
$this->srvc->hpMaxTime;
+ $maxMem = isset( $this->srvc->maxMemMap[$type] )
+ ? $this->srvc->maxMemMap[$type]
+ : $this->srvc->maxMemMap['*'];
+
+ $isCurl = (bool)$this->srvc->dispatcher;
+ if ( $isCurl ) {
+ $url = $this->srvc->dispatcher .
+ "?wiki=" . rawurlencode( $db ) .
+ "&type=" . rawurlencode( $type ) .
+ "&maxtime=" . rawurlencode( $maxTime );
+ $cmd = "curl -s -XPOST -a " . escapeshellarg( $url );
+ } else {
+ // Apply any HET-deploy style wrapper
+ $script = $this->srvc->wrapper
+ ? "{$this->srvc->wrapper} runJobs.php"
+ : "php maintenance/runJobs.php";
+
+ // Make sure the runner is launched with various
time/memory limits.
+ // Nice the process so things like ssh and deployment
scripts are fine.
+ $cmd = "$script --wiki=" . escapeshellarg( $db ) .
+ " --type=" . escapeshellarg( $type ) .
+ " --maxtime=" . escapeshellarg( $maxTime ) .
+ " --memory-limit=" . escapeshellarg( $maxMem );
+ if ( substr( php_uname(), 0, 7 ) !== 'Windows' ) {
+ $cmd = "nice -n 19 $cmd";
+ }
+ }
+
+ $descriptors = array(
+ 0 => array( "pipe", "r" ), // stdin (child)
+ 1 => array( "pipe", "w" ), // stdout (child)
+ 2 => array( "pipe", "w" ) // stderr (child)
+ );
+
+ $this->srvc->debug( "Spawning runner in loop $loop at slot
$slot ($type, $db):\n\t$cmd." );
+
+ // Start the runner in the background
+ $procSlot['handle'] = proc_open( $cmd, $descriptors,
$procSlot['pipes'] );
+ fclose( $procSlot['pipes'][0] );
+ unset( $procSlot['pipes'][0] ); // unused
+ $procSlot['db'] = $db;
+ $procSlot['type'] = $type;
+ $procSlot['cmd'] = $cmd;
+ $procSlot['stime'] = time();
+ $procSlot['sigtime'] = 0;
+ $procSlot['isCurl'] = true;
+
+ if ( $procSlot['handle'] ) {
+ return true;
+ } else {
+ $this->srvc->error( "Could not spawn process in loop
$loop: $cmd" );
+ $this->srvc->incrStats( 'runner-status.error', 1 );
+ return false;
+ }
+ }
+
+ /**
+ * @param integer $loop
+ * @param integer $slot
+ * @param array $procSlot
+ * @param integer $signal
+ */
+ protected function closeRunner( $loop, $slot, array &$procSlot, $signal
= null ) {
+ if ( $procSlot['pipes'] ) {
+ if ( $procSlot['pipes'][1] !== false ) {
+ fclose( $procSlot['pipes'][1] );
+ $procSlot['pipes'][1] = false;
+ }
+ if ( $procSlot['pipes'][2] !== false ) {
+ fclose( $procSlot['pipes'][2] );
+ $procSlot['pipes'][2] = false;
+ }
+ }
+ if ( $procSlot['handle'] ) {
+ $this->srvc->debug( "Closing process in loop $loop at
slot $slot." );
+ if ( $signal !== null ) {
+ // Tell the process to close with a signal
+ proc_terminate( $procSlot['handle'], $signal );
+ } else {
+ // Wait for the process to finish on its own
+ proc_close( $procSlot['handle'] );
+ }
+ }
+ $procSlot['handle'] = false;
+ $procSlot['db'] = null;
+ $procSlot['type'] = null;
+ $procSlot['stime'] = 0;
+ $procSlot['sigtime'] = 0;
+ $procSlot['cmd'] = null;
+ $procSlot['isCurl'] = false;
+ }
+
+ public function terminateSlots() {
+ foreach ( $this->procMap as &$procSlots ) {
+ foreach ( $procSlots as &$procSlot ) {
+ if ( !$procSlot['handle'] ) {
+ continue;
+ }
+ fclose( $procSlot['pipes'][1] );
+ fclose( $procSlot['pipes'][2] );
+ $status = proc_get_status( $procSlot['handle']
);
+ print "Sending SIGTERM to {$status['pid']}.\n";
+ proc_terminate( $procSlot['handle'] );
+ }
+ unset( $procSlot );
+ }
+ unset( $procSlots );
+ }
+}
--
To view, visit https://gerrit.wikimedia.org/r/149216
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Id73180ab3baea9e5e5013a9ea5cefeba43f057e6
Gerrit-PatchSet: 2
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits