Ottomata has submitted this change and it was merged.

Change subject: Use argparse to make statsv configurable
......................................................................


Use argparse to make statsv configurable

This change will have to be deployed along with puppet changes to properly
configure statsv in production

Bug: T150765
Change-Id: If374505624a2dca267e3867e81e441c3c2a1162b
---
M statsv.py
1 file changed, 92 insertions(+), 26 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/statsv.py b/statsv.py
index c9f56c9..cd16af7 100644
--- a/statsv.py
+++ b/statsv.py
@@ -30,31 +30,90 @@
 import re
 import socket
 import urlparse
+import argparse
+
 
 from kafka import KafkaConsumer
 
-logging.basicConfig(stream=sys.stderr, level=logging.INFO,
-                    format='%(asctime)s %(message)s')
+ap = argparse.ArgumentParser(
+    description='statsv - consumes from varnishkafka Kafka topic and writes 
metrics to statsd'
+)
+ap.add_argument(
+    '--topic',
+    help='Kafka topic from which to consume.  Default: statsv',
+    default='statsv'
+)
+ap.add_argument(
+    '--brokers',
+    help='Comma separated string of kafka brokers: Default: localhost:9092',
+    default='localhost:9092'
+)
+ap.add_argument(
+    '--consumer-group',
+    help='Consumer group to register with Kafka. Default: statsv',
+    default='statsv'
+)
+ap.add_argument(
+    '--statsd',
+    help='statsd host:port. Default: statsd:8125',
+    default='statsd:8125'
+)
+ap.add_argument(
+    '--verbose',
+    help='If true, statsd metrics will be logged at INFO level. Default: 
False',
+    action='store_true',
+    default=False
+)
+ap.add_argument(
+    '--dry-run',
+    help='If true, metrics will not be sent to statsd. Default: False',
+    action='store_true',
+    default=False
+)
+ap.add_argument(
+    '--log-level',
+    help='Logging level. Default: INFO',
+    default='INFO'
+)
+ap.add_argument(
+    '--consumer-timeout-seconds',
+    help='If the Kafka consumer does not receive a message in this amount of 
time, '
+    'it will timeout and this process will exit. Default: 60',
+    type=int,
+    default=60
+)
+ap.add_argument(
+    '--workers',
+    help='Number of processes to spawn that will process the consumed messages 
'
+    'and send to statsd.  Default: half the number of CPUs, or 1.',
+    type=int,
+    default=max(1, multiprocessing.cpu_count() // 2)
+)
 
+args = ap.parse_args()
+
+#  Setup logging
+logging.basicConfig(stream=sys.stderr, level=args.log_level,
+                    format='%(asctime)s %(message)s')
 # Set kafka module logging level to INFO
 logging.getLogger("kafka").setLevel(logging.INFO)
 
+verbose = args.verbose
+dry_run = args.dry_run
 
-supported_metric_types = ('c', 'g', 'ms')
-statsd_addr = ('statsd.eqiad.wmnet', 8125)
+# parse args for configuration
+statsd_addr = tuple(args.statsd.split(':'))
 
-# TODO: make these configurable.
-TIMEOUT_SECONDS = 60
-kafka_topic = 'statsv'
-kafka_consumer_group = 'statsv'
-kafka_bootstrap_servers = (
-    'kafka1012.eqiad.wmnet:9092',
-    'kafka1013.eqiad.wmnet:9092',
-    'kafka1014.eqiad.wmnet:9092',
-    'kafka1018.eqiad.wmnet:9092',
-    'kafka1020.eqiad.wmnet:9092',
-    'kafka1022.eqiad.wmnet:9092',
-)
+worker_count = args.workers
+
+kafka_bootstrap_servers = tuple(args.brokers.split(','))
+kafka_topic = args.topic
+kafka_consumer_group = args.consumer_group
+kafka_consumer_timeout_seconds = args.consumer_timeout_seconds
+
+SUPPORTED_METRIC_TYPES = ('c', 'g', 'ms')
+
+
 
 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0x80000)
 
@@ -116,25 +175,30 @@
             for metric_name, value in urlparse.parse_qsl(query_string):
                 metric_value, metric_type = re.search(
                         '^(\d+)([a-z]+)$', value).groups()
-                assert metric_type in supported_metric_types
+                assert metric_type in SUPPORTED_METRIC_TYPES
                 statsd_message = '%s:%s|%s' % (
                         metric_name, metric_value, metric_type)
-                sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
-                logging.debug(statsd_message)
+
+                if (verbose):
+                    logging.info(statsd_message)
+
+                if (not dry_run):
+                    sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
+
         except (AssertionError, AttributeError, KeyError):
             pass
 
+
+# Spawn worker_count workers to process incoming varnshkafka statsv messages.
 queue = multiprocessing.Queue()
 
-# Spawn either half as many workers as there are CPU cores.
-# On single-core machines, spawn a single worker.
-worker_count = max(1, multiprocessing.cpu_count() // 2)
-
+logging.info('Spawning %d workers to process statsv messages' % worker_count)
 for _ in range(worker_count):
     worker = multiprocessing.Process(target=process_queue, args=(queue,))
     worker.daemon = True
     worker.start()
 
+# Create our Kafka Consumer instance.
 consumer = KafkaConsumer(
     kafka_topic,
     bootstrap_servers=kafka_bootstrap_servers,
@@ -143,18 +207,20 @@
     # statsd metrics don't make sense if they lag,
     # so disable commits to avoid resuming at historical committed offset.
     enable_auto_commit=False,
-    consumer_timeout_ms=TIMEOUT_SECONDS * 1000,
+    consumer_timeout_ms=kafka_consumer_timeout_seconds * 1000
 )
 
 watchdog = Watchdog()
 
+logging.info('Starting statsv Kafka consumer.')
+# Consume messages from Kafka and put them onto the queue.
 try:
     for message in consumer:
         if message is not None:
             queue.put(message.value)
             watchdog.notify()
-    # If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
-    raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
+    # If we reach this line, kafka_consumer_timeout_seconds elapsed with no 
events received.
+    raise RuntimeError('No messages received in %d seconds.' % 
kafka_consumer_timeout_seconds)
 except Exception as e:
     logging.exception("Caught exception, aborting.")
 finally:

-- 
To view, visit https://gerrit.wikimedia.org/r/321911
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If374505624a2dca267e3867e81e441c3c2a1162b
Gerrit-PatchSet: 2
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to