Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/325836 )

Change subject: Use node-rdkafka-statsd to report rdkafka metrics
......................................................................


Use node-rdkafka-statsd to report rdkafka metrics

Change-Id: Ie2ecc2394b35870dec55106a9b5859403251e86d
---
M config.dev.yaml
M config.prod.yaml
M lib/eventstreams-util.js
M package.json
M routes/v2.js
5 files changed, 76 insertions(+), 6 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/config.dev.yaml b/config.dev.yaml
index 99f24ba..006a266 100644
--- a/config.dev.yaml
+++ b/config.dev.yaml
@@ -20,9 +20,9 @@
 
 # Statsd metrics reporter
 metrics:
-  #type: log
-  #host: localhost
-  #port: 8125
+  type: log
+  host: localhost
+  port: 8125
 
 services:
   - name: eventstreams
@@ -53,3 +53,4 @@
       # kafka configs go here.
       kafka:
         metadata.broker.list: 'localhost:9092'
+        statistics.interval.ms: 5000
diff --git a/config.prod.yaml b/config.prod.yaml
index 8262b0e..fd5ee0e 100644
--- a/config.prod.yaml
+++ b/config.prod.yaml
@@ -51,3 +51,5 @@
       # kafka configs go here.
       kafka:
         metadata.broker.list: 'localhost:9092'
+        statistics.interval.ms: 60000
+
diff --git a/lib/eventstreams-util.js b/lib/eventstreams-util.js
index 3340bb7..2c5eb7c 100644
--- a/lib/eventstreams-util.js
+++ b/lib/eventstreams-util.js
@@ -56,7 +56,61 @@
 }
 
 
+
+/**
+ * Filter function that will be passed as an option to the
+ * event.stats cb function that node-rdkafka-statsd will create
+ * to give each new node-rdkafka client instance.
+ *
+ * We implement a custom filter because we don't care to report
+ * some of these rdkafka metrics.  Specifically, we remove
+ * metrics about committed offsets, since kafka-sse does not commit.
+ */
+const rdkafkaStatsWhitelist = [
+    // Broker stats
+    'outbuf_cnt',
+    'outbuf_msg_cnt',
+    'waitresp_cnt',
+    'waitresp_msg_cnt',
+    'tx',
+    'txbytes',
+    'txerrs',
+    'txretries',
+    'req_timeouts',
+    'rx',
+    'rxbytes',
+    'rxerrs',
+    'rxcorriderrs',
+    'rxpartial',
+    'rtt',
+    'throttle',
+
+    // Topic partition stats
+    'msgq_cnt',
+    'msgq_bytes',
+    'xmit_msgq_cnt',
+    'xmit_msgq_bytes',
+    'fetchq_cnt',
+    'fetchq_size',
+    'next_offset',
+    'eof_offset',
+    'lo_offset',
+    'hi_offset',
+    'consumer_lag',
+    'txmsgs',
+    'txbytes',
+    'msgs',
+    'rx_ver_drops'
+];
+
+function rdkafkaStatsFilter(key) {
+    return _.includes(rdkafkaStatsWhitelist, key);
+}
+
+
+
 module.exports = {
-    deserializer: deserializer,
-    objectFactory: objectFactory,
+    deserializer:       deserializer,
+    objectFactory:      objectFactory,
+    rdkafkaStatsFilter: rdkafkaStatsFilter,
 };
diff --git a/package.json b/package.json
index 15ef778..16328ec 100644
--- a/package.json
+++ b/package.json
@@ -42,6 +42,7 @@
     "service-runner": "^2.0.4",
     "swagger-router": "^0.4.6",
     "lodash": "^4.15.0",
+    "node-rdkafka-statsd": "^0.1.0",
     "kafka-sse": 
"git+https://phabricator.wikimedia.org/diffusion/WKSE/kafkasse.git#v0.0.5";
   },
   "devDependencies": {
diff --git a/routes/v2.js b/routes/v2.js
index 3668748..e66b2d8 100644
--- a/routes/v2.js
+++ b/routes/v2.js
@@ -1,6 +1,7 @@
 'use strict';
 
-const kafkaSse  = require('kafka-sse');
+const kafkaSse = require('kafka-sse');
+const rdkafkaStatsd = require('node-rdkafka-statsd')
 
 const sUtil = require('../lib/util');
 const eUtil = require('../lib/eventstreams-util');
@@ -14,6 +15,7 @@
  * The main application object reported when this module is require()d
  */
 let app;
+
 
 
 /**
@@ -35,6 +37,16 @@
         // kafka message meta data in the deserialized message.meta object
         // that will be sent to the client as an event.
         deserializer:           eUtil.deserializer,
+        kafkaEventHandlers: {
+            // Create a child of this app's metrics object to use
+            // for consumer specific rdkafka metric reporting via 
node-rdkafka-statsd.
+            // This will emit consumer specific metrics prefixed like:
+            // eventstreams.rdkafka.2807ee91-bcc0-11e6-9e1f-a1c84e764327.
+            'event.stats': rdkafkaStatsd(
+                
app.metrics.makeChild(`rdkafka.${req.headers['x-request-id']}`),
+                { filterFn: eUtil.rdkafkaStatsFilter }
+            )
+        }
     });
 }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/325836
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ie2ecc2394b35870dec55106a9b5859403251e86d
Gerrit-PatchSet: 4
Gerrit-Project: mediawiki/services/eventstreams
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: Ppchelko <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to