Addshore has uploaded a new change for review.
https://gerrit.wikimedia.org/r/259660
Change subject: WIP DNM add method to send multiple stats at once
......................................................................
WIP DNM add method to send multiple stats at once
After writing this I think it would be better to
instead buffer the stats and send at the end of
each loop.
Change-Id: I216eadc7eeb0d64a5ee782463c69017a61ab72ec
---
M redisJobChronService
M redisJobRunnerService
M src/JobRunnerPipeline.php
M src/RedisJobService.php
4 files changed, 75 insertions(+), 29 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner
refs/changes/60/259660/1
diff --git a/redisJobChronService b/redisJobChronService
index 3836f5a..cf01be9 100755
--- a/redisJobChronService
+++ b/redisJobChronService
@@ -36,7 +36,7 @@
usleep( mt_rand( 0, self::PERIOD_WAIT_US ) ); // run out of
phase immediately
$memLast = memory_get_usage();
- $this->incrStats( "start-chron.$host", 1 );
+ $this->incrStat( "start-chron.$host", 1 );
while ( true ) {
pcntl_signal_dispatch();
@@ -48,7 +48,7 @@
// Track memory usage
$memCurrent = memory_get_usage();
$this->debug( "Memory usage: $memCurrent bytes." );
- $this->incrStats( "memory.$host", $memCurrent -
$memLast );
+ $this->incrStat( "memory.$host", $memCurrent - $memLast
);
$memLast = $memCurrent;
}
}
@@ -70,12 +70,12 @@
// Only let a limited number of services do this at once
$lockKey = $this->poolLock( __METHOD__, count(
$this->queueSrvs ), 300 );
if ( $lockKey === false ) {
- $this->incrStats( "periodictasks.raced.$host",
1 );
+ $this->incrStat( "periodictasks.raced.$host", 1
);
$this->notice( "Raced out of periodic tasks." );
return $jobs;
}
- $this->incrStats( "periodictasks.claimed.$host", 1 );
+ $this->incrStat( "periodictasks.claimed.$host", 1 );
// Job queue partition servers
$qServers = $this->queueSrvs;
@@ -87,7 +87,7 @@
// Run the chron script on each job queue partition
server...
foreach ( $qServers as $qServer ) {
if ( !$this->updatePartitionQueueServer(
$qServer, $aggrMap, $jobs, $lockKey ) ) {
- $this->incrStats(
"periodictasks.partition-failed.$qServer", 1 );
+ $this->incrStat(
"periodictasks.partition-failed.$qServer", 1 );
$ok = false;
}
}
@@ -111,9 +111,9 @@
}
if ( $ok ) {
- $this->incrStats( "periodictasks.done.$host", 1 );
+ $this->incrStat( "periodictasks.done.$host", 1 );
} else {
- $this->incrStats( "periodictasks.failed.$host", 1 );
+ $this->incrStat( "periodictasks.failed.$host", 1 );
$this->error( "Failed to do periodic tasks for some
queues." );
}
@@ -210,10 +210,10 @@
$aggrMap[$this->encQueueName( $qType,
$qDomain )] = time();
}
$affectedJobs = ( array_sum( $result ) - $ready
);
- $this->incrStats( "job-recycle.$qType",
$released );
- $this->incrStats( "job-abandon.$qType",
$abandoned );
- $this->incrStats( "job-undelay.$qType",
$undelayed );
- $this->incrStats( "job-prune.$qType", $pruned );
+ $this->incrStat( "job-recycle.$qType",
$released );
+ $this->incrStat( "job-abandon.$qType",
$abandoned );
+ $this->incrStat( "job-undelay.$qType",
$undelayed );
+ $this->incrStat( "job-prune.$qType", $pruned );
} else {
$affectedJobs = false;
}
diff --git a/redisJobRunnerService b/redisJobRunnerService
index 433bdee..c7be2f8 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -41,7 +41,7 @@
}
$memLast = memory_get_usage();
- $this->incrStats( "start-runner.$host", 1 );
+ $this->incrStat( "start-runner.$host", 1 );
while ( true ) {
pcntl_signal_dispatch();
@@ -52,7 +52,7 @@
$pending =& $this->getReadyQueueMap();
if ( !count( $pending ) ) {
$this->debug( "No jobs available..." );
- $this->incrStats( "idle.$host", 1 );
+ $this->incrStat( "idle.$host", 1 );
usleep( 100000 ); // no jobs
continue;
}
@@ -98,22 +98,22 @@
}
}
unset( $loopPriority );
- $this->incrStats( "spawn.$host", $anyNew );
- $this->incrStats( "prioritychange.$host", $prioSwitches
);
+ $this->incrStat( "spawn.$host", $anyNew );
+ $this->incrStat( "prioritychange.$host", $prioSwitches
);
// Backoff if there is nothing to do
if ( !$anyFree ) {
$this->debug( "All runner loops full." );
- $this->incrStats( "all-full.$host", 1 );
+ $this->incrStat( "all-full.$host", 1 );
usleep( 100000 );
} elseif ( !$anyNew ) {
$this->debug( "Loops have free slots, but there
are no appropriate jobs." );
- $this->incrStats( "some-full.$host", 1 );
+ $this->incrStat( "some-full.$host", 1 );
usleep( 100000 );
}
// Track memory usage
$memCurrent = memory_get_usage();
$this->debug( "Memory usage: $memCurrent bytes." );
- $this->incrStats( "memory.$host", $memCurrent -
$memLast );
+ $this->incrStat( "memory.$host", $memCurrent - $memLast
);
$memLast = $memCurrent;
}
}
diff --git a/src/JobRunnerPipeline.php b/src/JobRunnerPipeline.php
index a9405f1..ddb9eaf 100755
--- a/src/JobRunnerPipeline.php
+++ b/src/JobRunnerPipeline.php
@@ -60,11 +60,11 @@
"[{$age}s; max:
{$maxReal}s]:\n$cmd" );
posix_kill( $status['pid'], SIGTERM );
// non-blocking
$procSlot['sigtime'] = time();
- $this->srvc->incrStats(
'runner-status.timeout', 1 );
+ $this->srvc->incrStat(
'runner-status.timeout', 1 );
} elseif ( $age >= $maxReal && ( $cTime -
$procSlot['sigtime'] ) > 5 ) {
$this->srvc->error( "Runner loop $loop
process in slot $slot sent SIGKILL." );
$this->closeRunner( $loop, $slot,
$procSlot, SIGKILL );
- $this->srvc->incrStats(
'runner-status.kill', 1 );
+ $this->srvc->incrStat(
'runner-status.kill', 1 );
} else {
continue; // slot is busy
}
@@ -82,8 +82,8 @@
$ok += ( $status['status'] ===
'ok' ) ? 1 : 0;
}
$failed = count( $result['jobs'] ) -
$ok;
- $this->srvc->incrStats(
"pop.{$procSlot['type']}.ok.{$host}", $ok );
- $this->srvc->incrStats(
"pop.{$procSlot['type']}.failed.{$host}", $failed );
+ $this->srvc->incrStat(
"pop.{$procSlot['type']}.ok.{$host}", $ok );
+ $this->srvc->incrStat(
"pop.{$procSlot['type']}.failed.{$host}", $failed );
} else {
// Mention any serious errors that may
have occured
$cmd = $procSlot['cmd'];
@@ -93,13 +93,13 @@
}
$this->srvc->error( "Runner loop $loop
process in slot $slot " .
"gave status
'{$status['exitcode']}':\n$cmd\n\t$error" );
- $this->srvc->incrStats(
'runner-status.error', 1 );
+ $this->srvc->incrStat(
'runner-status.error', 1 );
}
$this->closeRunner( $loop, $slot, $procSlot );
} elseif ( !$status && $procSlot['handle'] ) {
$this->srvc->error( "Runner loop $loop process
in slot $slot gave no status." );
$this->closeRunner( $loop, $slot, $procSlot );
- $this->srvc->incrStats( 'runner-status.none', 1
);
+ $this->srvc->incrStat( 'runner-status.none', 1
);
}
++$free;
$queue = $this->selectQueue( $loop, $prioMap, $pending
);
@@ -216,7 +216,7 @@
return true;
} else {
$this->srvc->error( "Could not spawn process in loop
$loop: $cmd" );
- $this->srvc->incrStats( 'runner-status.error', 1 );
+ $this->srvc->incrStat( 'runner-status.error', 1 );
return false;
}
}
diff --git a/src/RedisJobService.php b/src/RedisJobService.php
index ff43d85..caabf03 100755
--- a/src/RedisJobService.php
+++ b/src/RedisJobService.php
@@ -4,6 +4,8 @@
* Base class for job services with main() implemented by subclasses
*/
abstract class RedisJobService {
+ const MAX_UDP_SIZE_STR = 512;
+
/** @var array List of IP:<port> entries */
protected $queueSrvs = array();
/** @var array List of IP:<port> entries */
@@ -287,7 +289,7 @@
public function handleRedisError( RedisException $e, $server ) {
unset( $this->conns[$server] );
$this->error( "Redis error: " . $e->getMessage() );
- $this->incrStats( "redis-error." . gethostname(), 1 );
+ $this->incrStat( "redis-error." . gethostname(), 1 );
}
/**
@@ -371,14 +373,42 @@
* @param integer $delta
* @return void
*/
- public function incrStats( $event, $delta = 1 ) {
+ public function incrStat( $event, $delta = 1 ) {
if ( !$this->statsdHost || $delta == 0 ) {
return; // nothing to do
}
+ $this->sendStatsPacket( $this->getStatPacket( $event, $delta )
);
+ }
- static $format = "%s:%s|c\n";
- $packet = sprintf( $format, "jobrunner.$event", $delta );
+ /**
+ * @param int[] $eventToDeltaMap string event name keys to int stats
deltas
+ */
+ public function incrStats( $eventToDeltaMap ) {
+ $packets = array();
+ foreach( $eventToDeltaMap as $event => $delta ) {
+ $packets[] = $this->getStatPacket( $event, $delta );
+ }
+ $packets = array_reduce( $packets, "self::doReduceStatPackets",
array() );
+ foreach( $packets as $packet ) {
+ $this->sendStatsPacket( $packet );
+ }
+ }
+ /**
+ * @param string $event
+ * @param integer $delta
+ *
+ * @return string
+ */
+ private function getStatPacket( $event, $delta ) {
+ return sprintf( "%s:%s|c\n", "jobrunner.$event", $delta );
+ }
+
+ /**
+ * @param string $packet
+ * @return void
+ */
+ private function sendStatsPacket( $packet ) {
if ( !function_exists( 'socket_create' ) ) {
$this->debug( 'No "socket_create" method available.' );
return;
@@ -399,6 +429,22 @@
);
}
+ private static function doReduceStatPackets($reducedMetrics, $metric)
+ {
+ $metricLength = strlen($metric);
+ $lastReducedMetric = count($reducedMetrics) > 0 ?
end($reducedMetrics) : null;
+ if ($metricLength >= self::MAX_UDP_SIZE_STR
+ || null === $lastReducedMetric
+ || strlen($newMetric = $lastReducedMetric . "\n" .
$metric) > self::MAX_UDP_SIZE_STR
+ ) {
+ $reducedMetrics[] = $metric;
+ } else {
+ array_pop($reducedMetrics);
+ $reducedMetrics[] = $newMetric;
+ }
+ return $reducedMetrics;
+ }
+
/**
* @param string $s
*/
--
To view, visit https://gerrit.wikimedia.org/r/259660
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I216eadc7eeb0d64a5ee782463c69017a61ab72ec
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Addshore <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits