Ori.livneh has submitted this change and it was merged.
Change subject: Die if no metrics received in one minute
......................................................................
Die if no metrics received in one minute
If no message is received in TIMEOUT_SECONDS (hard-coded to 60), die
(allowing init to restart the process). This should be made
configurable (or adaptive) in the future.
Change-Id: I3ad8bb49ceae76f2ae68ab0d67cf002b8db24a9e
---
M statsv.py
1 file changed, 12 insertions(+), 4 deletions(-)
Approvals:
Ori.livneh: Verified; Looks good to me, approved
diff --git a/statsv.py b/statsv.py
index ab51059..1917bfa 100644
--- a/statsv.py
+++ b/statsv.py
@@ -34,6 +34,7 @@
from pykafka.common import OffsetType
+TIMEOUT_SECONDS = 60 * 1000
logging.basicConfig(stream=sys.stderr, level=logging.INFO,
format='%(asctime)s %(message)s')
supported_metric_types = ('c', 'g', 'ms')
@@ -60,14 +61,15 @@
try:
query_string = data['uri_query'].lstrip('?')
for metric_name, value in urlparse.parse_qsl(query_string):
- metric_value, metric_type = re.search('^(\d+)([a-z]+)$',
value).groups()
+ metric_value, metric_type = re.search(
+ '^(\d+)([a-z]+)$', value).groups()
assert metric_type in supported_metric_types
- statsd_message = '%s:%s|%s' % (metric_name, metric_value,
metric_type)
+ statsd_message = '%s:%s|%s' % (
+ metric_name, metric_value, metric_type)
sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
logging.debug(statsd_message)
except (AssertionError, AttributeError, KeyError):
pass
-
queue = multiprocessing.Queue()
@@ -82,7 +84,13 @@
worker.start()
topic = kafka.topics['statsv']
-consumer = topic.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)
+consumer = topic.get_simple_consumer(
+ auto_offset_reset=OffsetType.LATEST,
+ consumer_timeout_ms=TIMEOUT_SECONDS * 1000)
for message in consumer:
if message is not None:
queue.put(message.value)
+
+# If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
+queue.close()
+raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
--
To view, visit https://gerrit.wikimedia.org/r/317847
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I3ad8bb49ceae76f2ae68ab0d67cf002b8db24a9e
Gerrit-PatchSet: 3
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits