Addshore has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/259660

Change subject: WIP DNM add method to send multiple stats at once
......................................................................

WIP DNM add method to send multiple stats at once

After writing this I think it would be better to
instead buffer the stats and send at the end of
each loop.

Change-Id: I216eadc7eeb0d64a5ee782463c69017a61ab72ec
---
M redisJobChronService
M redisJobRunnerService
M src/JobRunnerPipeline.php
M src/RedisJobService.php
4 files changed, 75 insertions(+), 29 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/jobrunner 
refs/changes/60/259660/1

diff --git a/redisJobChronService b/redisJobChronService
index 3836f5a..cf01be9 100755
--- a/redisJobChronService
+++ b/redisJobChronService
@@ -36,7 +36,7 @@
                usleep( mt_rand( 0, self::PERIOD_WAIT_US ) ); // run out of 
phase immediately
 
                $memLast = memory_get_usage();
-               $this->incrStats( "start-chron.$host", 1 );
+               $this->incrStat( "start-chron.$host", 1 );
                while ( true ) {
                        pcntl_signal_dispatch();
 
@@ -48,7 +48,7 @@
                        // Track memory usage
                        $memCurrent = memory_get_usage();
                        $this->debug( "Memory usage: $memCurrent bytes." );
-                       $this->incrStats( "memory.$host", $memCurrent - 
$memLast );
+                       $this->incrStat( "memory.$host", $memCurrent - $memLast 
);
                        $memLast = $memCurrent;
                }
        }
@@ -70,12 +70,12 @@
                        // Only let a limited number of services do this at once
                        $lockKey = $this->poolLock( __METHOD__, count( 
$this->queueSrvs ), 300 );
                        if ( $lockKey === false ) {
-                               $this->incrStats( "periodictasks.raced.$host", 
1 );
+                               $this->incrStat( "periodictasks.raced.$host", 1 
);
                                $this->notice( "Raced out of periodic tasks." );
                                return $jobs;
                        }
 
-                       $this->incrStats( "periodictasks.claimed.$host", 1 );
+                       $this->incrStat( "periodictasks.claimed.$host", 1 );
 
                        // Job queue partition servers
                        $qServers = $this->queueSrvs;
@@ -87,7 +87,7 @@
                        // Run the chron script on each job queue partition 
server...
                        foreach ( $qServers as $qServer ) {
                                if ( !$this->updatePartitionQueueServer( 
$qServer, $aggrMap, $jobs, $lockKey ) ) {
-                                       $this->incrStats( 
"periodictasks.partition-failed.$qServer", 1 );
+                                       $this->incrStat( 
"periodictasks.partition-failed.$qServer", 1 );
                                        $ok = false;
                                }
                        }
@@ -111,9 +111,9 @@
                }
 
                if ( $ok ) {
-                       $this->incrStats( "periodictasks.done.$host", 1 );
+                       $this->incrStat( "periodictasks.done.$host", 1 );
                } else {
-                       $this->incrStats( "periodictasks.failed.$host", 1 );
+                       $this->incrStat( "periodictasks.failed.$host", 1 );
                        $this->error( "Failed to do periodic tasks for some 
queues." );
                }
 
@@ -210,10 +210,10 @@
                                        $aggrMap[$this->encQueueName( $qType, 
$qDomain )] = time();
                                }
                                $affectedJobs = ( array_sum( $result ) - $ready 
);
-                               $this->incrStats( "job-recycle.$qType", 
$released );
-                               $this->incrStats( "job-abandon.$qType", 
$abandoned );
-                               $this->incrStats( "job-undelay.$qType", 
$undelayed );
-                               $this->incrStats( "job-prune.$qType", $pruned );
+                               $this->incrStat( "job-recycle.$qType", 
$released );
+                               $this->incrStat( "job-abandon.$qType", 
$abandoned );
+                               $this->incrStat( "job-undelay.$qType", 
$undelayed );
+                               $this->incrStat( "job-prune.$qType", $pruned );
                        } else {
                                $affectedJobs = false;
                        }
diff --git a/redisJobRunnerService b/redisJobRunnerService
index 433bdee..c7be2f8 100755
--- a/redisJobRunnerService
+++ b/redisJobRunnerService
@@ -41,7 +41,7 @@
                }
 
                $memLast = memory_get_usage();
-               $this->incrStats( "start-runner.$host", 1 );
+               $this->incrStat( "start-runner.$host", 1 );
                while ( true ) {
                        pcntl_signal_dispatch();
 
@@ -52,7 +52,7 @@
                        $pending =& $this->getReadyQueueMap();
                        if ( !count( $pending ) ) {
                                $this->debug( "No jobs available..." );
-                               $this->incrStats( "idle.$host", 1 );
+                               $this->incrStat( "idle.$host", 1 );
                                usleep( 100000 ); // no jobs
                                continue;
                        }
@@ -98,22 +98,22 @@
                                }
                        }
                        unset( $loopPriority );
-                       $this->incrStats( "spawn.$host", $anyNew );
-                       $this->incrStats( "prioritychange.$host", $prioSwitches 
);
+                       $this->incrStat( "spawn.$host", $anyNew );
+                       $this->incrStat( "prioritychange.$host", $prioSwitches 
);
                        // Backoff if there is nothing to do
                        if ( !$anyFree ) {
                                $this->debug( "All runner loops full." );
-                               $this->incrStats( "all-full.$host", 1 );
+                               $this->incrStat( "all-full.$host", 1 );
                                usleep( 100000 );
                        } elseif ( !$anyNew ) {
                                $this->debug( "Loops have free slots, but there 
are no appropriate jobs." );
-                               $this->incrStats( "some-full.$host", 1 );
+                               $this->incrStat( "some-full.$host", 1 );
                                usleep( 100000 );
                        }
                        // Track memory usage
                        $memCurrent = memory_get_usage();
                        $this->debug( "Memory usage: $memCurrent bytes." );
-                       $this->incrStats( "memory.$host", $memCurrent - 
$memLast );
+                       $this->incrStat( "memory.$host", $memCurrent - $memLast 
);
                        $memLast = $memCurrent;
                }
        }
diff --git a/src/JobRunnerPipeline.php b/src/JobRunnerPipeline.php
index a9405f1..ddb9eaf 100755
--- a/src/JobRunnerPipeline.php
+++ b/src/JobRunnerPipeline.php
@@ -60,11 +60,11 @@
                                                "[{$age}s; max: 
{$maxReal}s]:\n$cmd" );
                                        posix_kill( $status['pid'], SIGTERM ); 
// non-blocking
                                        $procSlot['sigtime'] = time();
-                                       $this->srvc->incrStats( 
'runner-status.timeout', 1 );
+                                       $this->srvc->incrStat( 
'runner-status.timeout', 1 );
                                } elseif ( $age >= $maxReal && ( $cTime - 
$procSlot['sigtime'] ) > 5 ) {
                                        $this->srvc->error( "Runner loop $loop 
process in slot $slot sent SIGKILL." );
                                        $this->closeRunner( $loop, $slot, 
$procSlot, SIGKILL );
-                                       $this->srvc->incrStats( 
'runner-status.kill', 1 );
+                                       $this->srvc->incrStat( 
'runner-status.kill', 1 );
                                } else {
                                        continue; // slot is busy
                                }
@@ -82,8 +82,8 @@
                                                $ok += ( $status['status'] === 
'ok' ) ? 1 : 0;
                                        }
                                        $failed = count( $result['jobs'] ) - 
$ok;
-                                       $this->srvc->incrStats( 
"pop.{$procSlot['type']}.ok.{$host}", $ok );
-                                       $this->srvc->incrStats( 
"pop.{$procSlot['type']}.failed.{$host}", $failed );
+                                       $this->srvc->incrStat( 
"pop.{$procSlot['type']}.ok.{$host}", $ok );
+                                       $this->srvc->incrStat( 
"pop.{$procSlot['type']}.failed.{$host}", $failed );
                                } else {
                                        // Mention any serious errors that may 
have occured
                                        $cmd = $procSlot['cmd'];
@@ -93,13 +93,13 @@
                                        }
                                        $this->srvc->error( "Runner loop $loop 
process in slot $slot " .
                                                "gave status 
'{$status['exitcode']}':\n$cmd\n\t$error" );
-                                       $this->srvc->incrStats( 
'runner-status.error', 1 );
+                                       $this->srvc->incrStat( 
'runner-status.error', 1 );
                                }
                                $this->closeRunner( $loop, $slot, $procSlot );
                        } elseif ( !$status && $procSlot['handle'] ) {
                                $this->srvc->error( "Runner loop $loop process 
in slot $slot gave no status." );
                                $this->closeRunner( $loop, $slot, $procSlot );
-                               $this->srvc->incrStats( 'runner-status.none', 1 
);
+                               $this->srvc->incrStat( 'runner-status.none', 1 
);
                        }
                        ++$free;
                        $queue = $this->selectQueue( $loop, $prioMap, $pending 
);
@@ -216,7 +216,7 @@
                        return true;
                } else {
                        $this->srvc->error( "Could not spawn process in loop 
$loop: $cmd" );
-                       $this->srvc->incrStats( 'runner-status.error', 1 );
+                       $this->srvc->incrStat( 'runner-status.error', 1 );
                        return false;
                }
        }
diff --git a/src/RedisJobService.php b/src/RedisJobService.php
index ff43d85..caabf03 100755
--- a/src/RedisJobService.php
+++ b/src/RedisJobService.php
@@ -4,6 +4,8 @@
  * Base class for job services with main() implemented by subclasses
  */
 abstract class RedisJobService {
+       const MAX_UDP_SIZE_STR = 512;
+
        /** @var array List of IP:<port> entries */
        protected $queueSrvs = array();
        /** @var array List of IP:<port> entries */
@@ -287,7 +289,7 @@
        public function handleRedisError( RedisException $e, $server ) {
                unset( $this->conns[$server] );
                $this->error( "Redis error: " . $e->getMessage() );
-               $this->incrStats( "redis-error." . gethostname(), 1 );
+               $this->incrStat( "redis-error." . gethostname(), 1 );
        }
 
        /**
@@ -371,14 +373,42 @@
         * @param integer $delta
         * @return void
         */
-       public function incrStats( $event, $delta = 1 ) {
+       public function incrStat( $event, $delta = 1 ) {
                if ( !$this->statsdHost || $delta == 0 ) {
                        return; // nothing to do
                }
+               $this->sendStatsPacket( $this->getStatPacket( $event, $delta ) 
);
+       }
 
-               static $format = "%s:%s|c\n";
-               $packet = sprintf( $format, "jobrunner.$event", $delta );
+       /**
+        * @param int[] $eventToDeltaMap string event name keys to int stats 
deltas
+        */
+       public function incrStats( $eventToDeltaMap ) {
+               $packets = array();
+               foreach( $eventToDeltaMap as $event => $delta ) {
+                       $packets[] = $this->getStatPacket( $event, $delta );
+               }
+               $packets = array_reduce( $packets, "self::doReduceStatPackets", 
array() );
+               foreach( $packets as $packet ) {
+                       $this->sendStatsPacket( $packet );
+               }
+       }
 
+       /**
+        * @param string $event
+        * @param integer $delta
+        *
+        * @return string
+        */
+       private function getStatPacket( $event, $delta ) {
+               return sprintf( "%s:%s|c\n", "jobrunner.$event", $delta );
+       }
+
+       /**
+        * @param string $packet
+        * @return void
+        */
+       private function sendStatsPacket( $packet ) {
                if ( !function_exists( 'socket_create' ) ) {
                        $this->debug( 'No "socket_create" method available.' );
                        return;
@@ -399,6 +429,22 @@
                );
        }
 
+       private static function doReduceStatPackets($reducedMetrics, $metric)
+       {
+               $metricLength = strlen($metric);
+               $lastReducedMetric = count($reducedMetrics) > 0 ? 
end($reducedMetrics) : null;
+               if ($metricLength >= self::MAX_UDP_SIZE_STR
+                       || null === $lastReducedMetric
+                       || strlen($newMetric = $lastReducedMetric . "\n" . 
$metric) > self::MAX_UDP_SIZE_STR
+               ) {
+                       $reducedMetrics[] = $metric;
+               } else {
+                       array_pop($reducedMetrics);
+                       $reducedMetrics[] = $newMetric;
+               }
+               return $reducedMetrics;
+       }
+
        /**
         * @param string $s
         */

-- 
To view, visit https://gerrit.wikimedia.org/r/259660
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I216eadc7eeb0d64a5ee782463c69017a61ab72ec
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/jobrunner
Gerrit-Branch: master
Gerrit-Owner: Addshore <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to