Ori.livneh has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/240290

Change subject: Dead-simple parallelization
......................................................................

Dead-simple parallelization

The main process reads items from Kafka and puts them in a
multiprocessing.Queue(). The child processes (which number processor_count / 2
in all) do the work of decoding each message into a metric and sending it to
statsd.

Change-Id: I60bd15503d24b58ca39903eb28262cdfa7dc7402
---
M statsv.py
1 file changed, 33 insertions(+), 12 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/statsv 
refs/changes/90/240290/1

diff --git a/statsv.py b/statsv.py
index 97527fb..e98a5f0 100644
--- a/statsv.py
+++ b/statsv.py
@@ -25,6 +25,7 @@
 
 import json
 import logging
+import multiprocessing
 import re
 import socket
 import urlparse
@@ -36,7 +37,6 @@
                     format='%(asctime)s %(message)s')
 supported_metric_types = ('c', 'g', 'ms')
 statsd_addr = ('statsd.eqiad.wmnet', 8125)
-statsd_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 kafka = KafkaClient((
     'kafka1012.eqiad.wmnet',
     'kafka1013.eqiad.wmnet',
@@ -46,16 +46,37 @@
     'kafka1022.eqiad.wmnet',
 ))
 
+
+def worker(q):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    while 1:
+        raw_data = q.get()
+        try:
+            data = json.loads(raw_data)
+        except:
+            logging.exception(raw_data)
+        try:
+            query_string = data['uri_query'].lstrip('?')
+            for metric_name, value in urlparse.parse_qsl(query_string):
+                metric_value, metric_type = re.search('(\d+)(\D+)', 
value).groups()
+                assert metric_type in supported_metric_types
+                statsd_message = '%s:%s|%s' % (metric_name, metric_value, 
metric_type)
+                sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
+                logging.debug(statsd_message)
+        except (AssertionError, AttributeError, KeyError):
+            pass
+
+
+
+queue = multiprocessing.Queue()
+
+# Spawn either half as many workers as there are CPU cores.
+# On single-core machines, spawn a single worker.
+worker_count = max(1, multiprocessing.cpu_count() // 2)
+
+for _ in range(worker_count):
+    multiprocessing.Process(target=worker, args=(queue,)).start()
+
 consumer = SimpleConsumer(kafka, 'statsv', 'statsv')
 for message in consumer:
-    data = json.loads(message.message.value)
-    try:
-        query_string = data['uri_query'].lstrip('?')
-        for metric_name, value in urlparse.parse_qsl(query_string):
-            metric_value, metric_type = re.search('(\d+)(\D+)', value).groups()
-            assert metric_type in supported_metric_types
-            statsd_message = '%s:%s|%s' % (metric_name, metric_value, 
metric_type)
-            statsd_sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
-            logging.debug(statsd_message)
-    except (AssertionError, AttributeError, KeyError):
-        pass
+    queue.put(message.message.value)

-- 
To view, visit https://gerrit.wikimedia.org/r/240290
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I60bd15503d24b58ca39903eb28262cdfa7dc7402
Gerrit-PatchSet: 1
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to