Aaron Schulz has uploaded a new change for review.
https://gerrit.wikimedia.org/r/149923
Change subject: Merged --dispatcher and --wrapper params
......................................................................
Merged --dispatcher and --wrapper params
* Always assume that a JSON response is given (whether using curl or not). This
means
that runJobs.php dispatchers need --result=json.
* Use the JSON response to increment job run/fail counters.
Change-Id: I0b9d13ffe116d906dd51c928054d2939433308e7
---
M redisJobRunnerService
1 file changed, 26 insertions(+), 47 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner
refs/changes/23/149923/1
diff --git a/redisJobRunnerService b/redisJobRunnerService
index aee0e46..6d988a9 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -42,15 +42,13 @@
/** @var array Map of (job type => integer) */
protected $attemptsMap = array();
- /** @var string MediaWiki script wrapper */
- public $wrapper;
/** @var array Map of (id => (job type list,count) */
public $loopMap = array();
/** @var array Map of (job type => integer) */
public $maxRealMap = array();
/** @var array Map of (job type => integer) */
public $maxMemMap = array();
- /** @var array String URL of dispatcher */
+ /** @var array String command to run jobs and return the status JSON
blob */
public $dispatcher;
/**
@@ -109,14 +107,14 @@
}
}
- foreach ( array( 'aggrSrvs', 'queueSrvs', 'runners' ) as $par )
{
+ foreach ( array( 'aggrSrvs', 'queueSrvs', 'runners',
'dispatcher' ) as $par ) {
if ( !isset( $config[$par] ) || isset( $config['help']
) ) {
die( "Usage: php RedisJobRunnerService.php\n" .
"--config-file=<path>\n" .
"--aggrSrvs=<address>[,address]...\n" .
"--queueSrvs=<address>[,address]...>\n"
.
"--runners=<count>:<type>[,type]...[|<count>:<type>[,type]...]...\n" .
- "--wrapper=<script wrapper>" .
+ "--dispatcher=<command or http(s) URL>"
.
"--claimTTL=<type>:<seconds>[,<type>:<seconds>]...\n" .
"--attempts=<type>:<integer>[,<type>:<integer>]...\n" .
"--maxReal=<type>:<integer>[,<type>:<integer>]...\n" .
@@ -162,9 +160,6 @@
if ( isset( $config['timeout'] ) ) {
$this->maxProcReal = (int)$config['timeout'];
- }
- if ( isset( $config['wrapper'] ) ) {
- $this->wrapper = $config['wrapper'];
}
if ( isset( $config['password'] ) ) {
$this->password = $config['password'];
@@ -777,7 +772,6 @@
'cmd' => null,
'stime' => 0,
'sigtime' => 0,
- 'isCurl' => false
);
}
@@ -790,6 +784,7 @@
public function refillSlots( $loop, array $prioMap, array &$pending ) {
$free = 0;
$new = 0;
+ $host = gethostname();
$cTime = time();
foreach ( $this->procMap[$loop] as $slot => &$procSlot ) {
$status = $procSlot['handle']
@@ -815,26 +810,25 @@
continue; // slot is busy
}
} elseif ( $status && !$status['running'] ) {
- $error = null;
- if ( $procSlot['isCurl'] && $status['exitcode']
== 0 ) {
- $error = stream_get_contents(
$procSlot['pipes'][1] );
- if ( !json_decode( $error ) ) {
- $status['exitcode'] = 1; //
probably PHP exception
- }
- }
- if ( $status['exitcode'] == 0 ) {
+ $response = stream_get_contents(
$procSlot['pipes'][1] );
+ $result = json_decode( $response ); // an array
if no exceptions happened
+ if ( $status['exitcode'] == 0 && is_array(
$result ) ) {
// If this finished early, lay off of
the queue for a while
if ( ( $cTime - $procSlot['stime'] ) <
$this->hpMaxTime/2 ) {
unset(
$pending[$procSlot['type']][$procSlot['db']] );
$this->srvc->debug( "Queue
'{$procSlot['db']}/{$procSlot['type']}' emptied." );
}
+ $ok = 0; // jobs that ran OK
+ foreach ( $result['jobs'] as $status ) {
+ $ok += ( $status['status'] ===
'ok' ) ? 1 : 0;
+ }
+ $failed = count( $result['jobs'] ) -
$ok;
+ $this->srvc->incrStats(
"pop.{$procSlot['type']}.ok.{$host}", $ok );
+ $this->srvc->incrStats(
"pop.{$procSlot['type']}.failed.{$host}", $ok );
} else {
// Mention any serious errors that may
have occured
$cmd = $procSlot['cmd'];
- if ( $error === null ) {
- $error = stream_get_contents(
$procSlot['pipes'][2] )
- ?: stream_get_contents(
$procSlot['pipes'][1] );
- }
+ $error = stream_get_contents(
$procSlot['pipes'][2] ) ?: $response;
$this->srvc->error( "Runner loop $loop
process in slot $slot " .
"gave status
'{$status['exitcode']}':\n$cmd\n\t$error" );
$this->srvc->incrStats(
'runner-status.error', 1 );
@@ -920,33 +914,20 @@
protected function spawnRunner( $loop, $slot, $highPrio, array $queue,
array &$procSlot ) {
// Pick a random queue
list( $type, $db ) = $queue;
- $maxTime = $highPrio ? $this->srvc->lpMaxTime :
$this->srvc->hpMaxTime;
- $maxMem = isset( $this->srvc->maxMemMap[$type] )
+ $maxtime = $highPrio ? $this->srvc->lpMaxTime :
$this->srvc->hpMaxTime;
+ $maxmem = isset( $this->srvc->maxMemMap[$type] )
? $this->srvc->maxMemMap[$type]
: $this->srvc->maxMemMap['*'];
- $isCurl = (bool)$this->srvc->dispatcher;
- if ( $isCurl ) {
- $url = $this->srvc->dispatcher .
- "?wiki=" . rawurlencode( $db ) .
- "&type=" . rawurlencode( $type ) .
- "&maxtime=" . rawurlencode( $maxTime );
- $cmd = "curl -s -XPOST -a " . escapeshellarg( $url );
- } else {
- // Apply any HET-deploy style wrapper
- $script = $this->srvc->wrapper
- ? "{$this->srvc->wrapper} runJobs.php"
- : "php maintenance/runJobs.php";
-
- // Make sure the runner is launched with various
time/memory limits.
- // Nice the process so things like ssh and deployment
scripts are fine.
- $cmd = "$script --wiki=" . escapeshellarg( $db ) .
- " --type=" . escapeshellarg( $type ) .
- " --maxtime=" . escapeshellarg( $maxTime ) .
- " --memory-limit=" . escapeshellarg( $maxMem );
- if ( substr( php_uname(), 0, 7 ) !== 'Windows' ) {
- $cmd = "nice -n 19 $cmd";
- }
+ // The dispatcher might be runJobs.php, curl, or wget
+ $dispatcher = $this->srvc->dispatcher;
+ $encodeFunc = preg_match( '#^(curl|wget) #', $dispatcher )
+ ? function( $s ) { return rawurlencode( $s ); }
+ : function( $s ) { return escapeshellarg( $s ); };
+ // Make sure the runner is launched with various time/memory
limits.
+ // Nice the process so things like ssh and deployment scripts
are fine.
+ foreach ( compact( 'db', 'type', 'maxtime', 'maxmem' ) as $k =>
$v ) {
+ $dispatcher = str_replace( "{{$k}}", $v, $encodeFunc(
$dispatcher ) );
}
$descriptors = array(
@@ -966,7 +947,6 @@
$procSlot['cmd'] = $cmd;
$procSlot['stime'] = time();
$procSlot['sigtime'] = 0;
- $procSlot['isCurl'] = true;
if ( $procSlot['handle'] ) {
return true;
@@ -1010,7 +990,6 @@
$procSlot['stime'] = 0;
$procSlot['sigtime'] = 0;
$procSlot['cmd'] = null;
- $procSlot['isCurl'] = false;
}
public function terminateSlots() {
--
To view, visit https://gerrit.wikimedia.org/r/149923
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0b9d13ffe116d906dd51c928054d2939433308e7
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits