Ottomata has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/325829 )
Change subject: Use node-rdkafka-statsd to report rdkafka metrics
......................................................................
Use node-rdkafka-statsd to report rdkafka metrics
Change-Id: Ie2ecc2394b35870dec55106a9b5859403251e86d
---
M config.dev.yaml
M lib/eventstreams-util.js
M package.json
M routes/v1.js
4 files changed, 74 insertions(+), 6 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/eventstreams
refs/changes/29/325829/1
diff --git a/config.dev.yaml b/config.dev.yaml
index 99f24ba..006a266 100644
--- a/config.dev.yaml
+++ b/config.dev.yaml
@@ -20,9 +20,9 @@
# Statsd metrics reporter
metrics:
- #type: log
- #host: localhost
- #port: 8125
+ type: log
+ host: localhost
+ port: 8125
services:
- name: eventstreams
@@ -53,3 +53,4 @@
# kafka configs go here.
kafka:
metadata.broker.list: 'localhost:9092'
+ statistics.interval.ms: 5000
diff --git a/lib/eventstreams-util.js b/lib/eventstreams-util.js
index 3340bb7..54a43f6 100644
--- a/lib/eventstreams-util.js
+++ b/lib/eventstreams-util.js
@@ -56,7 +56,61 @@
}
+
+/**
+ * Filter function that will be passed as an option to the
+ * event.stats cb function that node-rdkafka-statsd will create
+ * to give each new node-rdkafka client instance.
+ *
+ * We implement a custom filter because we don't care to report
+ * some of these rdkafka metrics. Specifically, we remove
+ * metrics about committed offsets, since kafka-sse does not commit.
+ */
+function rdkafkaStatsFilter(key) {
+ return _.includes([
+ // Broker stats
+ 'outbuf_cnt',
+ 'outbuf_msg_cnt',
+ 'waitresp_cnt',
+ 'waitresp_msg_cnt',
+ 'tx',
+ 'txbytes',
+ 'txerrs',
+ 'txretries',
+ 'req_timeouts',
+ 'rx',
+ 'rxbytes',
+ 'rxerrs',
+ 'rxcorriderrs',
+ 'rxpartial',
+ 'rtt',
+ 'throttle',
+
+ // Topic partition stats
+ 'msgq_cnt',
+ 'msgq_bytes',
+ 'xmit_msgq_cnt',
+ 'xmit_msgq_bytes',
+ 'fetchq_cnt',
+ 'fetchq_size',
+ 'next_offset',
+ 'eof_offset',
+ 'lo_offset',
+ 'hi_offset',
+ 'consumer_lag',
+ 'txmsgs',
+ 'txbytes',
+ 'msgs',
+ 'rx_ver_drops'
+ ],
+ key
+ );
+}
+
+
+
module.exports = {
- deserializer: deserializer,
- objectFactory: objectFactory,
+ deserializer: deserializer,
+ objectFactory: objectFactory,
+ rdkafkaStatsFilter: rdkafkaStatsFilter,
};
diff --git a/package.json b/package.json
index 15ef778..16328ec 100644
--- a/package.json
+++ b/package.json
@@ -42,6 +42,7 @@
"service-runner": "^2.0.4",
"swagger-router": "^0.4.6",
"lodash": "^4.15.0",
+ "node-rdkafka-statsd": "^0.1.0",
"kafka-sse":
"git+https://phabricator.wikimedia.org/diffusion/WKSE/kafkasse.git#v0.0.5"
},
"devDependencies": {
diff --git a/routes/v1.js b/routes/v1.js
index 29f5745..6633faf 100644
--- a/routes/v1.js
+++ b/routes/v1.js
@@ -1,6 +1,7 @@
'use strict';
const kafkaSse = require('kafka-sse');
+const rdkafkaStatsd = require('node-rdkafka-statsd')
const sUtil = require('../lib/util');
const eUtil = require('../lib/eventstreams-util');
@@ -14,6 +15,7 @@
* The main application object reported when this module is require()d
*/
let app;
+
/**
@@ -35,6 +37,16 @@
// kafka message meta data in the deserialized message.meta object
// that will be sent to the client as an event.
deserializer: eUtil.deserializer,
+ kafkaEventHandlers: {
+ // Create a child of this app's metrics object to use
+ // for consumer specific rdkafka metric reporting via
node-rdkafka-statsd.
+ // This will emit consumer specific metrics prefixed like:
+ // eventstreams.rdkafka.2807ee91-bcc0-11e6-9e1f-a1c84e764327.
+ 'event.stats': rdkafkaStatsd(
+
app.metrics.makeChild(`rdkafka.${req.headers['x-request-id']}`),
+ { filterFn: eUtil.rdkafkaStatsFilter }
+ )
+ }
});
}
@@ -45,7 +57,7 @@
const stream_names = Object.keys(app.conf.streams);
- // Create a new /v1/stream/${stream} route for each stream name.
+ // Create a new /stream/${stream} route for each stream name.
stream_names.forEach(stream => {
router.get(`/stream/${stream}`, (req, res) => {
return eventStream(req, res, app.conf.streams[stream].topics);
--
To view, visit https://gerrit.wikimedia.org/r/325829
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie2ecc2394b35870dec55106a9b5859403251e86d
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/eventstreams
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits