Ori.livneh has submitted this change and it was merged.

Change subject: Die if no metrics received in one minute
......................................................................


Die if no metrics received in one minute

If no message is received in TIMEOUT_SECONDS (hard-coded to 60), die
(allowing init to restart the process). This should be made
configurable (or adaptive) in the future.

Change-Id: I3ad8bb49ceae76f2ae68ab0d67cf002b8db24a9e
---
M statsv.py
1 file changed, 12 insertions(+), 4 deletions(-)

Approvals:
  Ori.livneh: Verified; Looks good to me, approved



diff --git a/statsv.py b/statsv.py
index ab51059..1917bfa 100644
--- a/statsv.py
+++ b/statsv.py
@@ -34,6 +34,7 @@
 from pykafka.common import OffsetType
 
 
+TIMEOUT_SECONDS = 60 * 1000
 logging.basicConfig(stream=sys.stderr, level=logging.INFO,
                     format='%(asctime)s %(message)s')
 supported_metric_types = ('c', 'g', 'ms')
@@ -60,14 +61,15 @@
         try:
             query_string = data['uri_query'].lstrip('?')
             for metric_name, value in urlparse.parse_qsl(query_string):
-                metric_value, metric_type = re.search('^(\d+)([a-z]+)$', 
value).groups()
+                metric_value, metric_type = re.search(
+                        '^(\d+)([a-z]+)$', value).groups()
                 assert metric_type in supported_metric_types
-                statsd_message = '%s:%s|%s' % (metric_name, metric_value, 
metric_type)
+                statsd_message = '%s:%s|%s' % (
+                        metric_name, metric_value, metric_type)
                 sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
                 logging.debug(statsd_message)
         except (AssertionError, AttributeError, KeyError):
             pass
-
 
 
 queue = multiprocessing.Queue()
@@ -82,7 +84,13 @@
     worker.start()
 
 topic = kafka.topics['statsv']
-consumer = topic.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)
+consumer = topic.get_simple_consumer(
+        auto_offset_reset=OffsetType.LATEST,
+        consumer_timeout_ms=TIMEOUT_SECONDS * 1000)
 for message in consumer:
     if message is not None:
         queue.put(message.value)
+
+# If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
+queue.close()
+raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)

-- 
To view, visit https://gerrit.wikimedia.org/r/317847
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I3ad8bb49ceae76f2ae68ab0d67cf002b8db24a9e
Gerrit-PatchSet: 3
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to