Ottomata has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/325836 )

Change subject: Use node-rdkafka-statsd to report rdkafka metrics
......................................................................

Use node-rdkafka-statsd to report rdkafka metrics

Change-Id: Ie2ecc2394b35870dec55106a9b5859403251e86d
---
M config.dev.yaml
M config.prod.yaml
M lib/eventstreams-util.js
M package.json
M routes/v1.js
5 files changed, 77 insertions(+), 7 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/eventstreams 
refs/changes/36/325836/1

diff --git a/config.dev.yaml b/config.dev.yaml
index 99f24ba..006a266 100644
--- a/config.dev.yaml
+++ b/config.dev.yaml
@@ -20,9 +20,9 @@
 
 # Statsd metrics reporter
 metrics:
-  #type: log
-  #host: localhost
-  #port: 8125
+  type: log
+  host: localhost
+  port: 8125
 
 services:
   - name: eventstreams
@@ -53,3 +53,4 @@
       # kafka configs go here.
       kafka:
         metadata.broker.list: 'localhost:9092'
+        statistics.interval.ms: 5000
diff --git a/config.prod.yaml b/config.prod.yaml
index 8262b0e..fd5ee0e 100644
--- a/config.prod.yaml
+++ b/config.prod.yaml
@@ -51,3 +51,5 @@
       # kafka configs go here.
       kafka:
         metadata.broker.list: 'localhost:9092'
+        statistics.interval.ms: 60000
+
diff --git a/lib/eventstreams-util.js b/lib/eventstreams-util.js
index 3340bb7..54a43f6 100644
--- a/lib/eventstreams-util.js
+++ b/lib/eventstreams-util.js
@@ -56,7 +56,61 @@
 }
 
 
+
+/**
+ * Filter function that will be passed as an option to the
+ * event.stats cb function that node-rdkafka-statsd will create
+ * to give each new node-rdkafka client instance.
+ *
+ * We implement a custom filter because we don't care to report
+ * some of these rdkafka metrics.  Specifically, we remove
+ * metrics about committed offsets, since kafka-sse does not commit.
+ */
+function rdkafkaStatsFilter(key) {
+    return _.includes([
+            // Broker stats
+            'outbuf_cnt',
+            'outbuf_msg_cnt',
+            'waitresp_cnt',
+            'waitresp_msg_cnt',
+            'tx',
+            'txbytes',
+            'txerrs',
+            'txretries',
+            'req_timeouts',
+            'rx',
+            'rxbytes',
+            'rxerrs',
+            'rxcorriderrs',
+            'rxpartial',
+            'rtt',
+            'throttle',
+
+            // Topic partition stats
+            'msgq_cnt',
+            'msgq_bytes',
+            'xmit_msgq_cnt',
+            'xmit_msgq_bytes',
+            'fetchq_cnt',
+            'fetchq_size',
+            'next_offset',
+            'eof_offset',
+            'lo_offset',
+            'hi_offset',
+            'consumer_lag',
+            'txmsgs',
+            'txbytes',
+            'msgs',
+            'rx_ver_drops'
+        ],
+        key
+    );
+}
+
+
+
 module.exports = {
-    deserializer: deserializer,
-    objectFactory: objectFactory,
+    deserializer:       deserializer,
+    objectFactory:      objectFactory,
+    rdkafkaStatsFilter: rdkafkaStatsFilter,
 };
diff --git a/package.json b/package.json
index 15ef778..16328ec 100644
--- a/package.json
+++ b/package.json
@@ -42,6 +42,7 @@
     "service-runner": "^2.0.4",
     "swagger-router": "^0.4.6",
     "lodash": "^4.15.0",
+    "node-rdkafka-statsd": "^0.1.0",
     "kafka-sse": 
"git+https://phabricator.wikimedia.org/diffusion/WKSE/kafkasse.git#v0.0.5";
   },
   "devDependencies": {
diff --git a/routes/v1.js b/routes/v1.js
index 29f5745..6c9d721 100644
--- a/routes/v1.js
+++ b/routes/v1.js
@@ -1,6 +1,7 @@
 'use strict';
 
-const kafkaSse  = require('kafka-sse');
+const kafkaSse = require('kafka-sse');
+const rdkafkaStatsd = require('node-rdkafka-statsd')
 
 const sUtil = require('../lib/util');
 const eUtil = require('../lib/eventstreams-util');
@@ -14,6 +15,7 @@
  * The main application object reported when this module is require()d
  */
 let app;
+
 
 
 /**
@@ -35,6 +37,16 @@
         // kafka message meta data in the deserialized message.meta object
         // that will be sent to the client as an event.
         deserializer:           eUtil.deserializer,
+        kafkaEventHandlers: {
+            // Create a child of this app's metrics object to use
+            // for consumer specific rdkafka metric reporting via 
node-rdkafka-statsd.
+            // This will emit consumer specific metrics prefixed like:
+            // eventstreams.rdkafka.2807ee91-bcc0-11e6-9e1f-a1c84e764327.
+            'event.stats': rdkafkaStatsd(
+                
app.metrics.makeChild(`rdkafka.${req.headers['x-request-id']}`),
+                { filterFn: eUtil.rdkafkaStatsFilter }
+            )
+        }
     });
 }
 
@@ -45,7 +57,7 @@
 
     const stream_names = Object.keys(app.conf.streams);
 
-    // Create a new /v1/stream/${stream} route for each stream name.
+    // Create a new /stream/${stream} route for each stream name.
     stream_names.forEach(stream => {
         router.get(`/stream/${stream}`, (req, res) => {
             return eventStream(req, res, app.conf.streams[stream].topics);

-- 
To view, visit https://gerrit.wikimedia.org/r/325836
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie2ecc2394b35870dec55106a9b5859403251e86d
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/eventstreams
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to