Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/375106 )

Change subject: webperf: Convert ve.py from ZMQ to KafkaConsumer
......................................................................


webperf: Convert ve.py from ZMQ to KafkaConsumer

Follows-up 06e9b65a7cc.

Bug: T110903
Change-Id: I0d8211876e74fda77a50d25e07451357828fc8ba
---
M modules/role/manifests/webperf.pp
M modules/webperf/files/ve.py
M modules/webperf/manifests/navtiming.pp
M modules/webperf/manifests/ve.pp
M modules/webperf/templates/ve.systemd.erb
5 files changed, 49 insertions(+), 36 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/modules/role/manifests/webperf.pp 
b/modules/role/manifests/webperf.pp
index 700ae45..ab0d806 100644
--- a/modules/role/manifests/webperf.pp
+++ b/modules/role/manifests/webperf.pp
@@ -32,17 +32,11 @@
         statsd_port   => $statsd_port,
     }
 
-    # TODO: Remove eventlogging specific things once ve uses Kafka: T110903
-    include ::eventlogging
-    $eventlogging_host = 'eventlog1001.eqiad.wmnet'
-    # Installed by eventlogging class using trebuchet
-    $eventlogging_path = '/srv/deployment/eventlogging/eventlogging'
     # Report VisualEditor performance measurements to Graphite.
-    # See <https://meta.wikimedia.org/wiki/Schema:TimingData>
+    # See <https://meta.wikimedia.org/wiki/Schema:Edit>
     class { '::webperf::ve':
-        endpoint          => "tcp://${eventlogging_host}:8600",
-        eventlogging_path => $eventlogging_path,
-        statsd_host       => $statsd_host,
-        statsd_port       => $statsd_port,
+        kafka_brokers => $kafka_brokers,
+        statsd_host   => $statsd_host,
+        statsd_port   => $statsd_port,
     }
 }
diff --git a/modules/webperf/files/ve.py b/modules/webperf/files/ve.py
index 7546d87..c8318e5 100644
--- a/modules/webperf/files/ve.py
+++ b/modules/webperf/files/ve.py
@@ -6,11 +6,12 @@
 sys.setdefaultencoding("utf-8")
 
 import argparse
+import json
 import socket
 import unittest
 import yaml
 
-import eventlogging
+from kafka import KafkaConsumer
 
 
 def handle_edit(meta):
@@ -31,8 +32,11 @@
 
 
 if __name__ == '__main__':
-    ap = argparse.ArgumentParser(description='PerfData StatsD module')
-    ap.add_argument('endpoint', help='URI of EventLogging endpoint')
+    ap = argparse.ArgumentParser(description='Send VisualEditor PerfData to 
StatsD')
+    ap.add_argument('--brokers', required=True,
+                    help='Comma-separated list of kafka brokers')
+    ap.add_argument('--consumer-group', required=True,
+                    help='Consumer group to register with Kafka')
     ap.add_argument('--statsd-host', default='localhost',
                     type=socket.gethostbyname)
     ap.add_argument('--statsd-port', default=8125, type=int)
@@ -41,15 +45,31 @@
     addr = args.statsd_host, args.statsd_port
     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
-    events = eventlogging.connect(args.endpoint)
+    kafka_bootstrap_servers = tuple(args.brokers.split(','))
+    kafka_topic = 'eventlogging_Edit'
+    kafka_consumer_timeout_seconds = 600
+    consumer = KafkaConsumer(
+        kafka_topic,
+        bootstrap_servers=kafka_bootstrap_servers,
+        group_id=args.consumer_group,
+        auto_offset_reset='latest',
+        enable_auto_commit=False,
+        consumer_timeout_ms=kafka_consumer_timeout_seconds * 1000
+    )
 
-    for meta in events.filter(schema='Edit'):
-        try:
-            stat = handle_edit(meta)
-            if stat is not None:
-                sock.sendto(stat.encode('utf-8'), addr)
-        except (ValueError, KeyError):
-            continue
+    try:
+        for message in consumer:
+            meta = json.loads(message.value)
+            try:
+                stat = handle_edit(meta)
+                if stat is not None:
+                    sock.sendto(stat.encode('utf-8'), addr)
+            except (ValueError, KeyError):
+                continue
+        # If we reach this line, consumer_timeout_ms elapsed without events
+        raise RuntimeError('No messages received in %d seconds.' % 
kafka_consumer_timeout_seconds)
+    finally:
+        consumer.close()
 
 
 # ##### Tests ######
diff --git a/modules/webperf/manifests/navtiming.pp 
b/modules/webperf/manifests/navtiming.pp
index 5923cb4..fda321b 100644
--- a/modules/webperf/manifests/navtiming.pp
+++ b/modules/webperf/manifests/navtiming.pp
@@ -2,12 +2,12 @@
 #
 # Captures NavigationTiming events from Kafka and send them to StatsD / 
Graphite.
 # See https://meta.wikimedia.org/wiki/Schema:NavigationTiming &
-# http://www.mediawiki.org/wiki/Extension:NavigationTiming
+# https://www.mediawiki.org/wiki/Extension:NavigationTiming
 #
 # === Parameters
 #
 # [*kafka_brokers*]
-#   string of comma separated Kafka bootstrap brokers
+#   String of comma separated Kafka bootstrap brokers.
 #
 # [*statsd_host*]
 #   Write stats to this StatsD instance. Default: '127.0.0.1'.
@@ -17,8 +17,8 @@
 #
 class webperf::navtiming(
     $kafka_brokers,
-    $statsd_host   = '127.0.0.1',
-    $statsd_port   = 8125,
+    $statsd_host = '127.0.0.1',
+    $statsd_port = 8125,
 ) {
     include ::webperf
 
diff --git a/modules/webperf/manifests/ve.pp b/modules/webperf/manifests/ve.pp
index 5c5499a..d5f5c7c 100644
--- a/modules/webperf/manifests/ve.pp
+++ b/modules/webperf/manifests/ve.pp
@@ -1,16 +1,12 @@
 # == Class: webperf::ve
 #
 # Captures VisualEditor timing data and sends it to StatsD.
+# See <https://meta.wikimedia.org/wiki/Schema:Edit>.
 #
 # === Parameters
 #
-# [*endpoint*]
-#   URI of EventLogging event publisher to subscribe to.
-#   Example: 'tcp://eventlogging.corp.org:8600'.
-#
-# [*eventlogging_path*]
-#   Path where the EventLogging python library is installed.
-#   Example: '/srv/deployment/eventlogging'.
+# [*kafka_brokers*]
+#   String of comma separated Kafka bootstrap brokers.
 #
 # [*statsd_host*]
 #   Write stats to this StatsD instance. Default: '127.0.0.1'.
@@ -19,12 +15,14 @@
 #   Write stats to this StatsD instance. Default: 8125.
 #
 class webperf::ve(
-    $endpoint,
-    $eventlogging_path,
+    $kafka_brokers,
     $statsd_host = '127.0.0.1',
     $statsd_port = 8125,
 ) {
     include ::webperf
+
+    require_package('python-kafka')
+    require_package('python-yaml')
 
     file { '/srv/webperf/ve.py':
         source => 'puppet:///modules/webperf/ve.py',
@@ -35,6 +33,7 @@
     }
 
     file { '/lib/systemd/system/ve.service':
+        # uses $statsd_host, $statsd_port, $kafka_brokers
         content => template('webperf/ve.systemd.erb'),
         notify  => Service['ve'],
     }
diff --git a/modules/webperf/templates/ve.systemd.erb 
b/modules/webperf/templates/ve.systemd.erb
index fa7839e..980b477 100644
--- a/modules/webperf/templates/ve.systemd.erb
+++ b/modules/webperf/templates/ve.systemd.erb
@@ -4,9 +4,9 @@
 
 [Service]
 WorkingDirectory=/srv/webperf
-Environment="PYTHONPATH=<%= @eventlogging_path %>"
 ExecStart=/usr/bin/python /srv/webperf/ve.py \
-    <%= @endpoint %> \
+    --brokers <%= @kafka_brokers %> \
+    --consumer-group webperf_ve \
     --statsd-host <%= @statsd_host %> \
     --statsd-port <%= @statsd_port %>
 User=nobody

-- 
To view, visit https://gerrit.wikimedia.org/r/375106
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I0d8211876e74fda77a50d25e07451357828fc8ba
Gerrit-PatchSet: 4
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Krinkle <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: Krinkle <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: Phedenskog <[email protected]>
Gerrit-Reviewer: Volans <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to