Ottomata has submitted this change and it was merged.
Change subject: Use argparse to make statsv configurable
......................................................................
Use argparse to make statsv configurable
This change will have to be deployed along with puppet changes to properly
configure statsv in production
Bug: T150765
Change-Id: If374505624a2dca267e3867e81e441c3c2a1162b
---
M statsv.py
1 file changed, 92 insertions(+), 26 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
diff --git a/statsv.py b/statsv.py
index c9f56c9..cd16af7 100644
--- a/statsv.py
+++ b/statsv.py
@@ -30,31 +30,90 @@
import re
import socket
import urlparse
+import argparse
+
from kafka import KafkaConsumer
-logging.basicConfig(stream=sys.stderr, level=logging.INFO,
- format='%(asctime)s %(message)s')
+ap = argparse.ArgumentParser(
+ description='statsv - consumes from varnishkafka Kafka topic and writes
metrics to statsd'
+)
+ap.add_argument(
+ '--topic',
+ help='Kafka topic from which to consume. Default: statsv',
+ default='statsv'
+)
+ap.add_argument(
+ '--brokers',
+ help='Comma separated string of kafka brokers: Default: localhost:9092',
+ default='localhost:9092'
+)
+ap.add_argument(
+ '--consumer-group',
+ help='Consumer group to register with Kafka. Default: statsv',
+ default='statsv'
+)
+ap.add_argument(
+ '--statsd',
+ help='statsd host:port. Default: statsd:8125',
+ default='statsd:8125'
+)
+ap.add_argument(
+ '--verbose',
+ help='If true, statsd metrics will be logged at INFO level. Default:
False',
+ action='store_true',
+ default=False
+)
+ap.add_argument(
+ '--dry-run',
+ help='If true, metrics will not be sent to statsd. Default: False',
+ action='store_true',
+ default=False
+)
+ap.add_argument(
+ '--log-level',
+ help='Logging level. Default: INFO',
+ default='INFO'
+)
+ap.add_argument(
+ '--consumer-timeout-seconds',
+ help='If the Kafka consumer does not receive a message in this amount of
time, '
+ 'it will timeout and this process will exit. Default: 60',
+ type=int,
+ default=60
+)
+ap.add_argument(
+ '--workers',
+ help='Number of processes to spawn that will process the consumed messages
'
+ 'and send to statsd. Default: half the number of CPUs, or 1.',
+ type=int,
+ default=max(1, multiprocessing.cpu_count() // 2)
+)
+args = ap.parse_args()
+
+# Setup logging
+logging.basicConfig(stream=sys.stderr, level=args.log_level,
+ format='%(asctime)s %(message)s')
# Set kafka module logging level to INFO
logging.getLogger("kafka").setLevel(logging.INFO)
+verbose = args.verbose
+dry_run = args.dry_run
-supported_metric_types = ('c', 'g', 'ms')
-statsd_addr = ('statsd.eqiad.wmnet', 8125)
+# parse args for configuration
+statsd_addr = tuple(args.statsd.split(':'))
-# TODO: make these configurable.
-TIMEOUT_SECONDS = 60
-kafka_topic = 'statsv'
-kafka_consumer_group = 'statsv'
-kafka_bootstrap_servers = (
- 'kafka1012.eqiad.wmnet:9092',
- 'kafka1013.eqiad.wmnet:9092',
- 'kafka1014.eqiad.wmnet:9092',
- 'kafka1018.eqiad.wmnet:9092',
- 'kafka1020.eqiad.wmnet:9092',
- 'kafka1022.eqiad.wmnet:9092',
-)
+worker_count = args.workers
+
+kafka_bootstrap_servers = tuple(args.brokers.split(','))
+kafka_topic = args.topic
+kafka_consumer_group = args.consumer_group
+kafka_consumer_timeout_seconds = args.consumer_timeout_seconds
+
+SUPPORTED_METRIC_TYPES = ('c', 'g', 'ms')
+
+
SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0x80000)
@@ -116,25 +175,30 @@
for metric_name, value in urlparse.parse_qsl(query_string):
metric_value, metric_type = re.search(
'^(\d+)([a-z]+)$', value).groups()
- assert metric_type in supported_metric_types
+ assert metric_type in SUPPORTED_METRIC_TYPES
statsd_message = '%s:%s|%s' % (
metric_name, metric_value, metric_type)
- sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
- logging.debug(statsd_message)
+
+ if (verbose):
+ logging.info(statsd_message)
+
+ if (not dry_run):
+ sock.sendto(statsd_message.encode('utf-8'), statsd_addr)
+
except (AssertionError, AttributeError, KeyError):
pass
+
+# Spawn worker_count workers to process incoming varnshkafka statsv messages.
queue = multiprocessing.Queue()
-# Spawn either half as many workers as there are CPU cores.
-# On single-core machines, spawn a single worker.
-worker_count = max(1, multiprocessing.cpu_count() // 2)
-
+logging.info('Spawning %d workers to process statsv messages' % worker_count)
for _ in range(worker_count):
worker = multiprocessing.Process(target=process_queue, args=(queue,))
worker.daemon = True
worker.start()
+# Create our Kafka Consumer instance.
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=kafka_bootstrap_servers,
@@ -143,18 +207,20 @@
# statsd metrics don't make sense if they lag,
# so disable commits to avoid resuming at historical committed offset.
enable_auto_commit=False,
- consumer_timeout_ms=TIMEOUT_SECONDS * 1000,
+ consumer_timeout_ms=kafka_consumer_timeout_seconds * 1000
)
watchdog = Watchdog()
+logging.info('Starting statsv Kafka consumer.')
+# Consume messages from Kafka and put them onto the queue.
try:
for message in consumer:
if message is not None:
queue.put(message.value)
watchdog.notify()
- # If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
- raise RuntimeError('No messages received in %d seconds.' % TIMEOUT_SECONDS)
+ # If we reach this line, kafka_consumer_timeout_seconds elapsed with no
events received.
+ raise RuntimeError('No messages received in %d seconds.' %
kafka_consumer_timeout_seconds)
except Exception as e:
logging.exception("Caught exception, aborting.")
finally:
--
To view, visit https://gerrit.wikimedia.org/r/321911
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: If374505624a2dca267e3867e81e441c3c2a1162b
Gerrit-PatchSet: 2
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits