Krinkle has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/403560 )

Change subject: [WIP] coal: Consume EventLogging from Kafka instead of ZMQ
......................................................................

[WIP] coal: Consume EventLogging from Kafka instead of ZMQ

Bug: T110903
Change-Id: I3d258f84cc4221a51750f79b5ba2dc4db329e570
---
M modules/coal/files/coal
M modules/coal/manifests/init.pp
M modules/coal/templates/initscripts/coal.systemd.erb
3 files changed, 40 insertions(+), 10 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/60/403560/1

diff --git a/modules/coal/files/coal b/modules/coal/files/coal
index 2ec68da..09aef4f 100755
--- a/modules/coal/files/coal
+++ b/modules/coal/files/coal
@@ -25,6 +25,7 @@
 sys.setdefaultencoding('utf-8')
 
 import argparse
+import json
 import collections
 import dateutil.parser
 import errno
@@ -37,7 +38,7 @@
 import time
 
 import whisper
-import zmq
+from kafka import KafkaConsumer
 
 
 WINDOW_SPAN = 60 * 5  # Size of sliding window, in seconds.
@@ -105,10 +106,10 @@
         default=os.getcwd(),
         help='Path for Whisper files. Defaults to working directory.'
     )
-    arg_parser.add_argument(
-        'endpoint',
-        help='EventLogging endpoint URL.'
-    )
+    ap.add_argument('--brokers', required=True,
+        help='Comma-separated list of kafka brokers')
+    ap.add_argument('--consumer-group', required=True,
+        help='Consumer group to register with Kafka')
 
     def __init__(self):
         self.args = self.arg_parser.parse_args()
@@ -128,7 +129,20 @@
     def run(self):
         self.create_whisper_files()
 
-        self.log.info('Connecting to %s.', self.args.endpoint)
+        # Based on webperf/navtiming.py
+        kafka_bootstrap_servers = tuple(self.args.brokers.split(','))
+        kafka_topics = ('eventlogging_NavigationTiming', 
'eventlogging_SaveTiming')
+        kafka_consumer_timeout_seconds = 60
+        consumer = KafkaConsumer(
+            *kafka_topics,
+            bootstrap_servers=kafka_bootstrap_servers,
+            group_id=self.args.consumer_group,
+            auto_offset_reset='latest',
+            enable_auto_commit=False,
+            consumer_timeout_ms=kafka_consumer_timeout_seconds * 1000
+        )
+
+        self.log.info('Starting Kafka consumer')
         socket = zmq.Context().socket(zmq.SUB)
         socket.connect(self.args.endpoint)
         socket.subscribe = b''
diff --git a/modules/coal/manifests/init.pp b/modules/coal/manifests/init.pp
index 10aa0c1..17f8b6f 100644
--- a/modules/coal/manifests/init.pp
+++ b/modules/coal/manifests/init.pp
@@ -1,6 +1,16 @@
 # == Class: coal
 #
-# Store a basic set of Navigation Timing metrics in Whisper files.
+# Captures NavigationTiming events from Kafka and writes
+# a subset of metric directly to Whisper files.
+#
+# This complements webperf::navtiming, which uses StatsD and writes
+# to Graphite's default backend via carbon.  StatsD produces derived metrics,
+# like 'p99' and 'sample_rate'. Graphite aggregates Carbon's Whisper files
+# at varying resolutions as data gets older.
+#
+# Coal, on the other hand, simply retains data for 1 year at a constant
+# resolution of 1-minute.
+#
 # See https://meta.wikimedia.org/wiki/Schema:NavigationTiming &
 # http://www.mediawiki.org/wiki/Extension:NavigationTiming
 #
@@ -10,11 +20,14 @@
 #   URI of EventLogging event publisher to subscribe to.
 #   For example, 'tcp://eventlogging.eqiad.wmnet:8600'.
 #
+# [*kafka_brokers*]
+#   String of comma separated Kafka bootstrap brokers.
+#
 class coal( $endpoint ) {
     require_package('python-flask')
     require_package('python-numpy')
     require_package('python-whisper')
-    require_package('python-zmq')
+    require_package('python-kafka')
 
     group { 'coal':
         ensure => present,
diff --git a/modules/coal/templates/initscripts/coal.systemd.erb 
b/modules/coal/templates/initscripts/coal.systemd.erb
index d53916b..405d83b 100644
--- a/modules/coal/templates/initscripts/coal.systemd.erb
+++ b/modules/coal/templates/initscripts/coal.systemd.erb
@@ -1,11 +1,14 @@
 [Unit]
-Description=Navigation Timing Whisper logger
+Description=Navigation Timing Whisper writer
 
 [Service]
 User=coal
 Group=coal
 Restart=always
-ExecStart=/usr/local/bin/coal --whisper-dir "/var/lib/coal" "<%= @endpoint %>"
+ExecStart=/usr/local/bin/coal \
+    --whisper-dir "/var/lib/coal" \
+    --brokers <%= @kafka_brokers %> \
+    --consumer-group coal
 
 [Install]
 WantedBy=multi-user.target

-- 
To view, visit https://gerrit.wikimedia.org/r/403560
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3d258f84cc4221a51750f79b5ba2dc4db329e570
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Krinkle <krinklem...@gmail.com>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to