Ottomata has submitted this change and it was merged.

Change subject: Use kafka-python instead of pykafka
......................................................................


Use kafka-python instead of pykafka

Change-Id: I29c1df8d367d0cd2e57d972abc9d2d5669d61bcd
---
M statsv.py
1 file changed, 34 insertions(+), 20 deletions(-)

Approvals:
  Ori.livneh: Looks good to me, approved
  Ottomata: Verified



diff --git a/statsv.py b/statsv.py
index 4f409b0..c9f56c9 100644
--- a/statsv.py
+++ b/statsv.py
@@ -31,27 +31,32 @@
 import socket
 import urlparse
 
-from pykafka import KafkaClient
-from pykafka.common import OffsetType
+from kafka import KafkaConsumer
 
-
-TIMEOUT_SECONDS = 60 * 1000
 logging.basicConfig(stream=sys.stderr, level=logging.INFO,
                     format='%(asctime)s %(message)s')
+
+# Set kafka module logging level to INFO
+logging.getLogger("kafka").setLevel(logging.INFO)
+
+
 supported_metric_types = ('c', 'g', 'ms')
 statsd_addr = ('statsd.eqiad.wmnet', 8125)
 
-kafka = KafkaClient(','.join((
+# TODO: make these configurable.
+TIMEOUT_SECONDS = 60
+kafka_topic = 'statsv'
+kafka_consumer_group = 'statsv'
+kafka_bootstrap_servers = (
     'kafka1012.eqiad.wmnet:9092',
     'kafka1013.eqiad.wmnet:9092',
     'kafka1014.eqiad.wmnet:9092',
     'kafka1018.eqiad.wmnet:9092',
     'kafka1020.eqiad.wmnet:9092',
     'kafka1022.eqiad.wmnet:9092',
-)))
+)
 
 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0x80000)
-
 
 class Watchdog:
     """
@@ -119,7 +124,6 @@
         except (AssertionError, AttributeError, KeyError):
             pass
 
-
 queue = multiprocessing.Queue()
 
 # Spawn either half as many workers as there are CPU cores.
@@ -131,18 +135,28 @@
     worker.daemon = True
     worker.start()
 
-topic = kafka.topics['statsv']
-consumer = topic.get_simple_consumer(
-        auto_offset_reset=OffsetType.LATEST,
-        consumer_timeout_ms=TIMEOUT_SECONDS * 1000)
+consumer = KafkaConsumer(
+    kafka_topic,
+    bootstrap_servers=kafka_bootstrap_servers,
+    group_id=kafka_consumer_group,
+    auto_offset_reset='latest',
+    # statsd metrics don't make sense if they lag,
+    # so disable commits to avoid resuming at historical committed offset.
+    enable_auto_commit=False,
+    consumer_timeout_ms=TIMEOUT_SECONDS * 1000,
+)
 
 watchdog = Watchdog()
 
-for message in consumer:
-    if message is not None:
-        queue.put(message.value)
-        watchdog.notify()
-
-# If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
-queue.close()
-raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
+try:
+    for message in consumer:
+        if message is not None:
+            queue.put(message.value)
+            watchdog.notify()
+    # If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
+    raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
+except Exception as e:
+    logging.exception("Caught exception, aborting.")
+finally:
+    queue.close()
+    consumer.close()

-- 
To view, visit https://gerrit.wikimedia.org/r/321550
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I29c1df8d367d0cd2e57d972abc9d2d5669d61bcd
Gerrit-PatchSet: 2
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to