Ori.livneh has uploaded a new change for review.
https://gerrit.wikimedia.org/r/240290
Change subject: Dead-simple parallelization
......................................................................
Dead-simple parallelization
The main process reads items from Kafka and puts them in a
multiprocessing.Queue(). The child processes (which number processor_count / 2
in all) do the work of decoding each message into a metric and sending it to
statsd.
Change-Id: I60bd15503d24b58ca39903eb28262cdfa7dc7402
---
M statsv.py
1 file changed, 33 insertions(+), 12 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/statsv
refs/changes/90/240290/1
diff --git a/statsv.py b/statsv.py
index 97527fb..e98a5f0 100644
--- a/statsv.py
+++ b/statsv.py
@@ -25,6 +25,7 @@
import json
import logging
+import multiprocessing
import re
import socket
import urlparse
@@ -36,7 +37,6 @@
format='%(asctime)s %(message)s')
supported_metric_types = ('c', 'g', 'ms')
statsd_addr = ('statsd.eqiad.wmnet', 8125)
-statsd_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
kafka = KafkaClient((
'kafka1012.eqiad.wmnet',
'kafka1013.eqiad.wmnet',
@@ -46,16 +46,37 @@
'kafka1022.eqiad.wmnet',
))
+
+def worker(q):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ while 1:
+ raw_data = q.get()
+ try:
+ data = json.loads(raw_data)
+ except:
+ logging.exception(raw_data)
+ try:
+ query_string = data['uri_query'].lstrip('?')
+ for metric_name, value in urlparse.parse_qsl(query_string):
+ metric_value, metric_type = re.search('(\d+)(\D+)',
value).groups()
+ assert metric_type in supported_metric_types
+ statsd_message = '%s:%s|%s' % (metric_name, metric_value,
metric_type)
+ sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
+ logging.debug(statsd_message)
+ except (AssertionError, AttributeError, KeyError):
+ pass
+
+
+
+queue = multiprocessing.Queue()
+
+# Spawn either half as many workers as there are CPU cores.
+# On single-core machines, spawn a single worker.
+worker_count = max(1, multiprocessing.cpu_count() // 2)
+
+for _ in range(worker_count):
+ multiprocessing.Process(target=worker, args=(queue,)).start()
+
consumer = SimpleConsumer(kafka, 'statsv', 'statsv')
for message in consumer:
- data = json.loads(message.message.value)
- try:
- query_string = data['uri_query'].lstrip('?')
- for metric_name, value in urlparse.parse_qsl(query_string):
- metric_value, metric_type = re.search('(\d+)(\D+)', value).groups()
- assert metric_type in supported_metric_types
- statsd_message = '%s:%s|%s' % (metric_name, metric_value,
metric_type)
- statsd_sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
- logging.debug(statsd_message)
- except (AssertionError, AttributeError, KeyError):
- pass
+ queue.put(message.message.value)
--
To view, visit https://gerrit.wikimedia.org/r/240290
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I60bd15503d24b58ca39903eb28262cdfa7dc7402
Gerrit-PatchSet: 1
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits