Ottomata has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/375106 )
Change subject: webperf: Convert ve.py from ZMQ to KafkaConsumer
......................................................................
webperf: Convert ve.py from ZMQ to KafkaConsumer
Follows-up 06e9b65a7cc.
Bug: T110903
Change-Id: I0d8211876e74fda77a50d25e07451357828fc8ba
---
M modules/role/manifests/webperf.pp
M modules/webperf/files/ve.py
M modules/webperf/manifests/navtiming.pp
M modules/webperf/manifests/ve.pp
M modules/webperf/templates/ve.systemd.erb
5 files changed, 49 insertions(+), 36 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
diff --git a/modules/role/manifests/webperf.pp
b/modules/role/manifests/webperf.pp
index 700ae45..ab0d806 100644
--- a/modules/role/manifests/webperf.pp
+++ b/modules/role/manifests/webperf.pp
@@ -32,17 +32,11 @@
statsd_port => $statsd_port,
}
- # TODO: Remove eventlogging specific things once ve uses Kafka: T110903
- include ::eventlogging
- $eventlogging_host = 'eventlog1001.eqiad.wmnet'
- # Installed by eventlogging class using trebuchet
- $eventlogging_path = '/srv/deployment/eventlogging/eventlogging'
# Report VisualEditor performance measurements to Graphite.
- # See <https://meta.wikimedia.org/wiki/Schema:TimingData>
+ # See <https://meta.wikimedia.org/wiki/Schema:Edit>
class { '::webperf::ve':
- endpoint => "tcp://${eventlogging_host}:8600",
- eventlogging_path => $eventlogging_path,
- statsd_host => $statsd_host,
- statsd_port => $statsd_port,
+ kafka_brokers => $kafka_brokers,
+ statsd_host => $statsd_host,
+ statsd_port => $statsd_port,
}
}
diff --git a/modules/webperf/files/ve.py b/modules/webperf/files/ve.py
index 7546d87..c8318e5 100644
--- a/modules/webperf/files/ve.py
+++ b/modules/webperf/files/ve.py
@@ -6,11 +6,12 @@
sys.setdefaultencoding("utf-8")
import argparse
+import json
import socket
import unittest
import yaml
-import eventlogging
+from kafka import KafkaConsumer
def handle_edit(meta):
@@ -31,8 +32,11 @@
if __name__ == '__main__':
- ap = argparse.ArgumentParser(description='PerfData StatsD module')
- ap.add_argument('endpoint', help='URI of EventLogging endpoint')
+ ap = argparse.ArgumentParser(description='Send VisualEditor PerfData to
StatsD')
+ ap.add_argument('--brokers', required=True,
+ help='Comma-separated list of kafka brokers')
+ ap.add_argument('--consumer-group', required=True,
+ help='Consumer group to register with Kafka')
ap.add_argument('--statsd-host', default='localhost',
type=socket.gethostbyname)
ap.add_argument('--statsd-port', default=8125, type=int)
@@ -41,15 +45,31 @@
addr = args.statsd_host, args.statsd_port
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- events = eventlogging.connect(args.endpoint)
+ kafka_bootstrap_servers = tuple(args.brokers.split(','))
+ kafka_topic = 'eventlogging_Edit'
+ kafka_consumer_timeout_seconds = 600
+ consumer = KafkaConsumer(
+ kafka_topic,
+ bootstrap_servers=kafka_bootstrap_servers,
+ group_id=args.consumer_group,
+ auto_offset_reset='latest',
+ enable_auto_commit=False,
+ consumer_timeout_ms=kafka_consumer_timeout_seconds * 1000
+ )
- for meta in events.filter(schema='Edit'):
- try:
- stat = handle_edit(meta)
- if stat is not None:
- sock.sendto(stat.encode('utf-8'), addr)
- except (ValueError, KeyError):
- continue
+ try:
+ for message in consumer:
+ meta = json.loads(message.value)
+ try:
+ stat = handle_edit(meta)
+ if stat is not None:
+ sock.sendto(stat.encode('utf-8'), addr)
+ except (ValueError, KeyError):
+ continue
+ # If we reach this line, consumer_timeout_ms elapsed without events
+ raise RuntimeError('No messages received in %d seconds.' %
kafka_consumer_timeout_seconds)
+ finally:
+ consumer.close()
# ##### Tests ######
diff --git a/modules/webperf/manifests/navtiming.pp
b/modules/webperf/manifests/navtiming.pp
index 5923cb4..fda321b 100644
--- a/modules/webperf/manifests/navtiming.pp
+++ b/modules/webperf/manifests/navtiming.pp
@@ -2,12 +2,12 @@
#
# Captures NavigationTiming events from Kafka and send them to StatsD /
Graphite.
# See https://meta.wikimedia.org/wiki/Schema:NavigationTiming &
-# http://www.mediawiki.org/wiki/Extension:NavigationTiming
+# https://www.mediawiki.org/wiki/Extension:NavigationTiming
#
# === Parameters
#
# [*kafka_brokers*]
-# string of comma separated Kafka bootstrap brokers
+# String of comma separated Kafka bootstrap brokers.
#
# [*statsd_host*]
# Write stats to this StatsD instance. Default: '127.0.0.1'.
@@ -17,8 +17,8 @@
#
class webperf::navtiming(
$kafka_brokers,
- $statsd_host = '127.0.0.1',
- $statsd_port = 8125,
+ $statsd_host = '127.0.0.1',
+ $statsd_port = 8125,
) {
include ::webperf
diff --git a/modules/webperf/manifests/ve.pp b/modules/webperf/manifests/ve.pp
index 5c5499a..d5f5c7c 100644
--- a/modules/webperf/manifests/ve.pp
+++ b/modules/webperf/manifests/ve.pp
@@ -1,16 +1,12 @@
# == Class: webperf::ve
#
# Captures VisualEditor timing data and sends it to StatsD.
+# See <https://meta.wikimedia.org/wiki/Schema:Edit>.
#
# === Parameters
#
-# [*endpoint*]
-# URI of EventLogging event publisher to subscribe to.
-# Example: 'tcp://eventlogging.corp.org:8600'.
-#
-# [*eventlogging_path*]
-# Path where the EventLogging python library is installed.
-# Example: '/srv/deployment/eventlogging'.
+# [*kafka_brokers*]
+# String of comma separated Kafka bootstrap brokers.
#
# [*statsd_host*]
# Write stats to this StatsD instance. Default: '127.0.0.1'.
@@ -19,12 +15,14 @@
# Write stats to this StatsD instance. Default: 8125.
#
class webperf::ve(
- $endpoint,
- $eventlogging_path,
+ $kafka_brokers,
$statsd_host = '127.0.0.1',
$statsd_port = 8125,
) {
include ::webperf
+
+ require_package('python-kafka')
+ require_package('python-yaml')
file { '/srv/webperf/ve.py':
source => 'puppet:///modules/webperf/ve.py',
@@ -35,6 +33,7 @@
}
file { '/lib/systemd/system/ve.service':
+ # uses $statsd_host, $statsd_port, $kafka_brokers
content => template('webperf/ve.systemd.erb'),
notify => Service['ve'],
}
diff --git a/modules/webperf/templates/ve.systemd.erb
b/modules/webperf/templates/ve.systemd.erb
index fa7839e..980b477 100644
--- a/modules/webperf/templates/ve.systemd.erb
+++ b/modules/webperf/templates/ve.systemd.erb
@@ -4,9 +4,9 @@
[Service]
WorkingDirectory=/srv/webperf
-Environment="PYTHONPATH=<%= @eventlogging_path %>"
ExecStart=/usr/bin/python /srv/webperf/ve.py \
- <%= @endpoint %> \
+ --brokers <%= @kafka_brokers %> \
+ --consumer-group webperf_ve \
--statsd-host <%= @statsd_host %> \
--statsd-port <%= @statsd_port %>
User=nobody
--
To view, visit https://gerrit.wikimedia.org/r/375106
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I0d8211876e74fda77a50d25e07451357828fc8ba
Gerrit-PatchSet: 4
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Krinkle <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: Krinkle <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: Phedenskog <[email protected]>
Gerrit-Reviewer: Volans <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits