Krinkle has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/403560 )
Change subject: [WIP] coal: Consume EventLogging from Kafka instead of ZMQ ...................................................................... [WIP] coal: Consume EventLogging from Kafka instead of ZMQ Bug: T110903 Change-Id: I3d258f84cc4221a51750f79b5ba2dc4db329e570 --- M modules/coal/files/coal M modules/coal/manifests/init.pp M modules/coal/templates/initscripts/coal.systemd.erb 3 files changed, 40 insertions(+), 10 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet refs/changes/60/403560/1 diff --git a/modules/coal/files/coal b/modules/coal/files/coal index 2ec68da..09aef4f 100755 --- a/modules/coal/files/coal +++ b/modules/coal/files/coal @@ -25,6 +25,7 @@ sys.setdefaultencoding('utf-8') import argparse +import json import collections import dateutil.parser import errno @@ -37,7 +38,7 @@ import time import whisper -import zmq +from kafka import KafkaConsumer WINDOW_SPAN = 60 * 5 # Size of sliding window, in seconds. @@ -105,10 +106,10 @@ default=os.getcwd(), help='Path for Whisper files. Defaults to working directory.' ) - arg_parser.add_argument( - 'endpoint', - help='EventLogging endpoint URL.' - ) + ap.add_argument('--brokers', required=True, + help='Comma-separated list of kafka brokers') + ap.add_argument('--consumer-group', required=True, + help='Consumer group to register with Kafka') def __init__(self): self.args = self.arg_parser.parse_args() @@ -128,7 +129,20 @@ def run(self): self.create_whisper_files() - self.log.info('Connecting to %s.', self.args.endpoint) + # Based on webperf/navtiming.py + kafka_bootstrap_servers = tuple(self.args.brokers.split(',')) + kafka_topics = ('eventlogging_NavigationTiming', 'eventlogging_SaveTiming') + kafka_consumer_timeout_seconds = 60 + consumer = KafkaConsumer( + *kafka_topics, + bootstrap_servers=kafka_bootstrap_servers, + group_id=self.args.consumer_group, + auto_offset_reset='latest', + enable_auto_commit=False, + consumer_timeout_ms=kafka_consumer_timeout_seconds * 1000 + ) + + self.log.info('Starting Kafka consumer') socket = zmq.Context().socket(zmq.SUB) socket.connect(self.args.endpoint) socket.subscribe = b'' diff --git a/modules/coal/manifests/init.pp b/modules/coal/manifests/init.pp index 10aa0c1..17f8b6f 100644 --- a/modules/coal/manifests/init.pp +++ b/modules/coal/manifests/init.pp @@ -1,6 +1,16 @@ # == Class: coal # -# Store a basic set of Navigation Timing metrics in Whisper files. +# Captures NavigationTiming events from Kafka and writes +# a subset of metric directly to Whisper files. +# +# This complements webperf::navtiming, which uses StatsD and writes +# to Graphite's default backend via carbon. StatsD produces derived metrics, +# like 'p99' and 'sample_rate'. Graphite aggregates Carbon's Whisper files +# at varying resolutions as data gets older. +# +# Coal, on the other hand, simply retains data for 1 year at a constant +# resolution of 1-minute. +# # See https://meta.wikimedia.org/wiki/Schema:NavigationTiming & # http://www.mediawiki.org/wiki/Extension:NavigationTiming # @@ -10,11 +20,14 @@ # URI of EventLogging event publisher to subscribe to. # For example, 'tcp://eventlogging.eqiad.wmnet:8600'. # +# [*kafka_brokers*] +# String of comma separated Kafka bootstrap brokers. +# class coal( $endpoint ) { require_package('python-flask') require_package('python-numpy') require_package('python-whisper') - require_package('python-zmq') + require_package('python-kafka') group { 'coal': ensure => present, diff --git a/modules/coal/templates/initscripts/coal.systemd.erb b/modules/coal/templates/initscripts/coal.systemd.erb index d53916b..405d83b 100644 --- a/modules/coal/templates/initscripts/coal.systemd.erb +++ b/modules/coal/templates/initscripts/coal.systemd.erb @@ -1,11 +1,14 @@ [Unit] -Description=Navigation Timing Whisper logger +Description=Navigation Timing Whisper writer [Service] User=coal Group=coal Restart=always -ExecStart=/usr/local/bin/coal --whisper-dir "/var/lib/coal" "<%= @endpoint %>" +ExecStart=/usr/local/bin/coal \ + --whisper-dir "/var/lib/coal" \ + --brokers <%= @kafka_brokers %> \ + --consumer-group coal [Install] WantedBy=multi-user.target -- To view, visit https://gerrit.wikimedia.org/r/403560 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3d258f84cc4221a51750f79b5ba2dc4db329e570 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: Krinkle <krinklem...@gmail.com> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits