Ottomata has submitted this change and it was merged.
Change subject: Use kafka-python instead of pykafka
......................................................................
Use kafka-python instead of pykafka
Change-Id: I29c1df8d367d0cd2e57d972abc9d2d5669d61bcd
---
M statsv.py
1 file changed, 34 insertions(+), 20 deletions(-)
Approvals:
Ori.livneh: Looks good to me, approved
Ottomata: Verified
diff --git a/statsv.py b/statsv.py
index 4f409b0..c9f56c9 100644
--- a/statsv.py
+++ b/statsv.py
@@ -31,27 +31,32 @@
import socket
import urlparse
-from pykafka import KafkaClient
-from pykafka.common import OffsetType
+from kafka import KafkaConsumer
-
-TIMEOUT_SECONDS = 60 * 1000
logging.basicConfig(stream=sys.stderr, level=logging.INFO,
format='%(asctime)s %(message)s')
+
+# Set kafka module logging level to INFO
+logging.getLogger("kafka").setLevel(logging.INFO)
+
+
supported_metric_types = ('c', 'g', 'ms')
statsd_addr = ('statsd.eqiad.wmnet', 8125)
-kafka = KafkaClient(','.join((
+# TODO: make these configurable.
+TIMEOUT_SECONDS = 60
+kafka_topic = 'statsv'
+kafka_consumer_group = 'statsv'
+kafka_bootstrap_servers = (
'kafka1012.eqiad.wmnet:9092',
'kafka1013.eqiad.wmnet:9092',
'kafka1014.eqiad.wmnet:9092',
'kafka1018.eqiad.wmnet:9092',
'kafka1020.eqiad.wmnet:9092',
'kafka1022.eqiad.wmnet:9092',
-)))
+)
SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0x80000)
-
class Watchdog:
"""
@@ -119,7 +124,6 @@
except (AssertionError, AttributeError, KeyError):
pass
-
queue = multiprocessing.Queue()
# Spawn either half as many workers as there are CPU cores.
@@ -131,18 +135,28 @@
worker.daemon = True
worker.start()
-topic = kafka.topics['statsv']
-consumer = topic.get_simple_consumer(
- auto_offset_reset=OffsetType.LATEST,
- consumer_timeout_ms=TIMEOUT_SECONDS * 1000)
+consumer = KafkaConsumer(
+ kafka_topic,
+ bootstrap_servers=kafka_bootstrap_servers,
+ group_id=kafka_consumer_group,
+ auto_offset_reset='latest',
+ # statsd metrics don't make sense if they lag,
+ # so disable commits to avoid resuming at historical committed offset.
+ enable_auto_commit=False,
+ consumer_timeout_ms=TIMEOUT_SECONDS * 1000,
+)
watchdog = Watchdog()
-for message in consumer:
- if message is not None:
- queue.put(message.value)
- watchdog.notify()
-
-# If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
-queue.close()
-raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
+try:
+ for message in consumer:
+ if message is not None:
+ queue.put(message.value)
+ watchdog.notify()
+ # If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
+ raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
+except Exception as e:
+ logging.exception("Caught exception, aborting.")
+finally:
+ queue.close()
+ consumer.close()
--
To view, visit https://gerrit.wikimedia.org/r/321550
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I29c1df8d367d0cd2e57d972abc9d2d5669d61bcd
Gerrit-PatchSet: 2
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits