Ottomata has submitted this change and it was merged. Change subject: node-rdkafka event.stats callback that reports stats to statsd ......................................................................
node-rdkafka event.stats callback that reports stats to statsd Node module that parses an object structure, flattens it and sends the whitelisted metrics to statsd Depends implicitily on statsd client Bug: T145099 Change-Id: I15d8a2f93b34bfafa5c80b191b416e780f33c515 --- A .gitignore A .jshintignore A .jshintrc A .travis.yml A LICENSE A README.md A lib/rdkafka-statsd.js A package.json A test/index.js A test/test.js 10 files changed, 1,137 insertions(+), 0 deletions(-) Approvals: Ottomata: Verified; Looks good to me, approved diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..392ef11 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Logs +logs +*.log +npm-debug.log* + +# Dependency directories +node_modules + +# Optional npm cache directory +.npm + +# Optional REPL history +.node_repl_history + +# IntelliJ IDEA +.idea/* + +# Sublime +*.sublime* + +# JSDoc results +out/ diff --git a/.jshintignore b/.jshintignore new file mode 100644 index 0000000..1c69eee --- /dev/null +++ b/.jshintignore @@ -0,0 +1,3 @@ +coverage +node_modules +test diff --git a/.jshintrc b/.jshintrc new file mode 100644 index 0000000..283025c --- /dev/null +++ b/.jshintrc @@ -0,0 +1,33 @@ +{ + "predef": [ + "setImmediate", + "Map", + "Set", + "describe", + "it" + ], + + "bitwise": true, + "laxbreak": true, + "curly": true, + "eqeqeq": true, + "immed": true, + "latedef": "nofunc", + "newcap": true, + "noarg": true, + "noempty": true, + "nonew": true, + "regexp": false, + "undef": true, + "strict": true, + "trailing": true, + + "smarttabs": true, + "multistr": true, + + "node": true, + + "nomen": false, + "loopfunc": true, + "esnext": true +} diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..7ee6233 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,18 @@ +language: node_js +node_js: + - "0.10" + - "0.12" + - "4.2" + - "5" + +sudo: false + +notifications: + irc: + channels: + - "irc.freenode.org#wikimedia-analytics" + on_success: change + on_failure: always + +script: npm test + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e06d208 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..a016d69 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +Utility module to flatten an object, filter keys and send wanted metrics to statsd. +By default metrics that include '-1' or 'toppars' are not sent. diff --git a/lib/rdkafka-statsd.js b/lib/rdkafka-statsd.js new file mode 100644 index 0000000..c29a3ed --- /dev/null +++ b/lib/rdkafka-statsd.js @@ -0,0 +1,266 @@ +/** + * Parses an object structure, flattens it + * and sends the whitelisted metrics to statsd + * Depends implicitily on statsd client + * Use like: + + var StatsD require('node-statsd'), + client new StatsD({ host: 'statsd.eqiad.wmnet'}); // or metrics-reporter + + function myFilter(key) = {...} + + var rdkafkaStatsdCb = require('node-rdkafka-statsd')(client,{'filterFn': myFilter}); + + var kafka = require('node-rdkafka'); + var consumer = new kafka.KafkaConsumer({ + ... + 'statistics.interval.ms': 30000, + }); + // Flowing mode + consumer.connect(); + consumer + .on('ready', function() { + consumer.consume('some-topic'); + }) + .on('event.stats', rdkafka_statsd_cb); + * + * + * If filterFn is set to false no whitelist is applied + * metrics that include 'toppars' or '-1' are filtered by default + **/ + +'use strict'; + + + +const DOT = '.'; + +/** + * Default filter function, can be overriden when instantiating module + **/ +function defaultFilterFn(key) { + const defaultWhitelist = ['hi_offset', + 'lo_offset', + 'eof_offset', + 'committed_offset', + 'query_offset', + 'next_offset', + 'app_offset', + 'stored_offset', + + ]; + + + return defaultWhitelist.indexOf(key) >= 0; +} + + +function isNil(item) { + // catching both null and undefined, jshint doesn't let us do == + return item === null || item === undefined; +} + +function isArray(obj) { + return !isNil(obj) && Array.isArray(obj); +} + + +function isObject(obj) { + return !isNil(obj) && obj.constructor === Object; +} + +function isFunction(obj) { + return !isNil(obj) && obj.constructor === Function; +} + +// we assume that if it is not another object or array +// it is a primitive type +// obviously not true in javascript at large but true for this module usage +function isPrimitive(item) { + return !isObject(item) && !isArray(item); +} + +function isString(item) { + // gotcha string objects need to be converted to string + // primitive types, valueof does that + return typeof item.valueOf() === "string"; + +} + +function isNumber(item) { + var amount = Number(item); + return !Number.isNaN(amount) && Number.isFinite(amount); + +} + +function cleanUnwantedChars(str) { + const UNDERSCORE = '_'; + return str.replace(/:|\.|;|\//g, UNDERSCORE); +} + +/** + * Recurses through objects and flattens them + * into a single level dict of key: value pairs. Each + * key consists of all of the recursed keys joined by + * separator that is hardcoded to be a dot. + * + **/ +function flatten(obj) { + + var dict = {}; + + // return empty object, raising an error seems like + // it would polute the logs + if (!isObject(obj) && !isArray(obj)) { + return dict; + } + + // looks like metric obj is really sent as a string + if ('message' in obj) { + obj = JSON.parse(obj.message); + } + + + function _flatten(obj, dict, keyPrefix) { + + // object has -ad minimum - a key, value pair + // {'some-key': 'some-value-maybe-another-object'} + // decide if object or array + + Object.keys(obj).forEach(function (name) { + var keyName; + + var cleanName = cleanUnwantedChars(name); + + if (keyPrefix) { + keyName = keyPrefix + DOT + cleanName; + } else { + keyName = cleanName; + } + + if (isPrimitive(obj[name])) { + dict[keyName] = obj[name]; + } else { + // continue recursing + _flatten(obj[name], dict, keyName); + + } + + }); + + + return dict; + } + return _flatten(obj, dict, null); +} + +/** + * Removes unwanted keys from data object + * Object must be fully built as the keys you are interested on might be deep + * in the structure. + * + */ +function filterKeys(data, filterFn) { + + var filteredData = {}; + + // these values are nonsensical and always filtered regardless + // of passed in filter function + const blacklist = ['toppars', '-1']; + + + Object.keys(data).forEach(function (name) { + // split property by '.' + var items = name.split(DOT); + //check blacklist, takes precedence + var blacklisted = false; + for (var j = 0; j < items.length; j++) { + if (blacklist.indexOf(items[j]) >= 0) { + blacklisted = true; + break; + } + } + // if any of the items appears in filterFn keep property on object + // unless this key is alredy blacklisted + for (var i = 0; !blacklisted && i < items.length; i++) { + + if (filterFn(items[i])) { + filteredData[name] = data[name]; + break; + } + } + + }); + + return filteredData; +} + +/** + * Reports metrics to statsd using reporter + * returns data for convenience, but not really needed + * Please take a look at https://blog.pkhamre.com/understanding-statsd-and-graphite/ + * to learn about graphite metrics. + * + * Gauge metrics represent a 'an arbitrary (numeric) value at a point in time' + * and this is what kafka metrics will be most often. + * + **/ +function report(data, reporter) { + + Object.keys(data).forEach(function (metric) { + if (isNumber(data[metric])) { + + try { + reporter.gauge(metric, data[metric]); + } catch (e) { + //nothing you can do + } + } + + }); + return data; +} + +/** + * Returns a function that parses an object and sents its key/value pairs as metric/value + * to graphite. + * + * Given a raw object it will be traversed, and each leaf node of the object will + * be keyed by a concatenated key made up of all parent keys. + * Keys are the name of metrics that are sent to graphite + * Clients have the option of filtering metrics and only sent the ones they care about + * by supplying a filtering function that given a string (metric name) returns true or false + * A default whitelist is provided + * + * If filterFn === false no whitelist is applied + * + **/ + +function createRdKafkaStatsCb(reporter, options) { + var optionalWhitelistFn; + + options = options || {}; + + + // use default filtering if none is passed in + if (!('filterFn' in options)) { + options.filterFn = defaultFilterFn; + } else if (options.filterFn === false) { + // no filtering + return function (rawObject) { + return report(flatten(rawObject), reporter); + + }; + } else if (!isFunction(options.filterFn)) { + throw new Error('Must provide a function for options.filterFn'); + } + + + return function (rawObject) { + return report(filterKeys(flatten(rawObject), options.filterFn), reporter); + }; + +} + +/************** Module exports ***********************/ +module.exports = createRdKafkaStatsCb; diff --git a/package.json b/package.json new file mode 100644 index 0000000..aa68c97 --- /dev/null +++ b/package.json @@ -0,0 +1,39 @@ +{ + "name": "node-rdkafka-statsd", + "version": "0.1.0", + "description": "node-rdkafka event.stats callback that sends rdkafka statistics to statsd.", + "main": "lib/rdkafka-statsd.js", + "scripts": { + "test": "mocha" + }, + "repository": { + "type": "git", + "url": "git://gerrit.wikimedia.org/r//node-rdkafka-statsd.git" + }, + "keywords": [ + "analytics", + "kafka", + "statsd" + ], + "author": "Wikimedia Analytics Team <analyt...@wikimedia.org>", + "license": "Apache-2.0", + "bugs": { + "url": "https://phabricator.wikimedia.org/tag/analytics/" + }, + "dependencies": {}, + "devDependencies": { + "js-yaml": "^3.5.2", + "bunyan": "^1.5.1", + "coveralls": "^2.11.6", + "istanbul": "^0.4.2", + "mocha": "^2.5.3", + "mocha-jshint": "^2.3.1", + "mocha-lcov-reporter": "^1.0.0" + }, + "deploy": { + "node": "4.4.6", + "dependencies": { + "_all": [] + } + } +} diff --git a/test/index.js b/test/index.js new file mode 100644 index 0000000..deb30a0 --- /dev/null +++ b/test/index.js @@ -0,0 +1,3 @@ +'use strict'; + + // Run jshint as part of normal testing +require('mocha-jshint')(); diff --git a/test/test.js b/test/test.js new file mode 100644 index 0000000..f180e76 --- /dev/null +++ b/test/test.js @@ -0,0 +1,549 @@ +"use strict"; + +const assert = require('assert'); + + +describe('Testing object flattening. No filter function (Blackbox, through exposed function)', function () { + + const fakeStatsdReporter = { + set: function () {}, + gauge: function () {} + }; + const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, { + 'filterFn': false + }); + + it('Testing object flattening happy case', function () { + + var a = { + 'pepe': { + 'pepe': 45, + 'ana': 23, + 'juanito': 45, + 'bad:key': 'bad!', + }, + 'ana': 63, + 'juanito': 45, + 'bad:ana': 'hola!' + + + }; + + var rawObject = { + "message": JSON.stringify(a) + }; + + var flat_a = cb(rawObject); + assert.deepEqual(flat_a.ana, 63); + assert.deepEqual(flat_a['pepe.ana'], 23); + assert.deepEqual(flat_a['juanito'], 45); + //':' substitution happened + assert.deepEqual(flat_a['bad_ana'], 'hola!'); + assert.deepEqual(flat_a['pepe.bad_key'], 'bad!'); + + + }); + +}); + +describe('Testing object flattening and filtering with passed-in filter function (Blackbox, through exposed function)', function () { + + const fakeFilterFn = function (key) { + var l = ['ana', 'simple_cnt', 'rx_ver_drops', 'lo_offset']; + return l.indexOf(key) >= 0; + + }; + + const fakeStatsdReporter = { + gauge: function () {} + }; + const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, { + 'filterFn': fakeFilterFn + }); + + it('Testing object flattening happy case', function () { + + var a = { + 'pepe': { + 'pepe': 45, + 'ana': 23, + 'juanito': 45 + }, + 'ana': 63, + 'juanito': 45 + }; + + + var rawObject = { + "message": JSON.stringify(a) + }; + + var flat_a = cb(rawObject); + assert.deepEqual(flat_a.ana, 63); + assert.deepEqual(flat_a['pepe.ana'], 23); + assert(flat_a.juanito === undefined); + + }); + + it('Testing object flattening empty object', function () { + + var a = {}; + var flat_a = cb(a); + assert.deepEqual(flat_a, {}); + + }); + + it('Testing object flattening no object', function () { + + var a = "hola"; + var flat_a = cb(a); + assert.deepEqual(flat_a, {}); + + }); + + + it('Testing object flattening null object', function () { + + var a = null; + var flat_a = cb(a); + assert.deepEqual(flat_a, {}); + + }); + + + it('Test flattening, real kafka object', function () { + var metrics = { + "topics": { + "test4": { + "partitions": { + "-1": { + "rx_ver_drops": 0, + "msgs": 0, + "txbytes": 0, + "txmsgs": 0, + "consumer_lag": -1, + "hi_offset": -1001, + "lo_offset": -1001, + "eof_offset": -1001, + "committed_offset": -1001, + "xmit_msgq_bytes": 0, + "xmit_msgq_cnt": 0, + "msgq_bytes": 0, + "msgq_cnt": 0, + "unknown": false, + "desired": false, + "leader": -1, + "partition": -1, + "fetchq_cnt": 0, + "fetchq_size": 0, + "fetch_state": "none", + "query_offset": 0, + "next_offset": 0, + "app_offset": -1001, + "stored_offset": -1001, + "commited_offset": -1001 + }, + "0": { + "rx_ver_drops": 0, + "msgs": 0, + "txbytes": 0, + "txmsgs": 0, + "consumer_lag": 0, + "hi_offset": 34784, + "lo_offset": -1001, + "eof_offset": 34784, + "committed_offset": -1001, + "xmit_msgq_bytes": 0, + "xmit_msgq_cnt": 0, + "msgq_bytes": 0, + "msgq_cnt": 0, + "unknown": false, + "desired": true, + "leader": 0, + "partition": 0, + "fetchq_cnt": 30582, + "fetchq_size": 6224678, + "fetch_state": "active", + "query_offset": 0, + "next_offset": 34784, + "app_offset": 6143, + "stored_offset": 6143, + "commited_offset": -1001 + } + }, + "metadata_age": 5995, + "topic": "test4" + } + }, + + + "brokers": { + "mediawiki-vagrant.dev:9092/0": { + "toppars": { + "test4": { + "partition": 0, + "topic": "test4" + } + }, + "throttle": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rtt": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rxpartial": 3, + "rxcorriderrs": 0, + "waitresp_msg_cnt": 0, + "waitresp_cnt": 1, + "outbuf_msg_cnt": 0, + "outbuf_cnt": 0, + "stateage": 7023265, + "state": "UP", + "nodeid": 0, + "name": "mediawiki-vagrant.dev:9092/0", + "tx": 57, + "txbytes": 3609, + "txerrs": 0, + "txretries": 0, + "req_timeouts": 0, + "rx": 56, + "rxbytes": 3406226, + "rxerrs": 0 + }, + "localhost:9092/bootstrap": { + "toppars": {}, + "throttle": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rtt": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rxpartial": 0, + "rxcorriderrs": 0, + "waitresp_msg_cnt": 0, + "waitresp_cnt": 0, + "outbuf_msg_cnt": 0, + "outbuf_cnt": 0, + "stateage": 7058566, + "state": "UP", + "nodeid": -1, + "name": "localhost:9092/bootstrap", + "tx": 5, + "txbytes": 172, + "txerrs": 0, + "txretries": 0, + "req_timeouts": 0, + "rx": 5, + "rxbytes": 8146, + "rxerrs": 0 + } + }, + + "name": "rdkafka#consumer-1", + "type": "consumer", + "ts": 69158506047, + "time": 1472147268, + "replyq": 30583, + "msg_cnt": 0, + "msg_max": 100000, + "simple_cnt": 0 + }; + + + var rawObject = { + "message": JSON.stringify(metrics) + }; + + var flat_metrics = cb(rawObject); + assert.deepEqual(flat_metrics['simple_cnt'], 0); + assert.deepEqual(flat_metrics['topics.test4.partitions.0.rx_ver_drops'], 0); + // '-1' is filtered + assert.deepEqual(flat_metrics['topics.test4.partitions.-1.lo_offset'], undefined); + assert.deepEqual(flat_metrics['topics.test4.partitions.0.lo_offset'], -1001); + + assert(flat_metrics.msg_max === undefined); + + }); + +}); + + +describe('Testing object flattening and filtering with default filter function (Blackbox, through exposed function)', function () { + + const fakeStatsdReporter = { + gauge: function () {} + }; + + const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter); + + + it('Test flattening and filtering default function, real kafka object', function () { + var metrics = { + "topics": { + "test4": { + "partitions": { + "-1": { + "rx_ver_drops": 0, + "msgs": 0, + "txbytes": 0, + "txmsgs": 0, + "consumer_lag": -1, + "hi_offset": -1001, + "lo_offset": -1001, + "eof_offset": -1001, + "committed_offset": -1001, + "xmit_msgq_bytes": 0, + "xmit_msgq_cnt": 0, + "msgq_bytes": 0, + "msgq_cnt": 0, + "unknown": false, + "desired": false, + "leader": -1, + "partition": -1, + "fetchq_cnt": 0, + "fetchq_size": 0, + "fetch_state": "none", + "query_offset": 0, + "next_offset": 0, + "app_offset": -1001, + "stored_offset": -1001, + "commited_offset": -1001 + }, + "0": { + "rx_ver_drops": 0, + "msgs": 0, + "txbytes": 0, + "txmsgs": 0, + "consumer_lag": 0, + "hi_offset": 34784, + "lo_offset": -1001, + "eof_offset": 34784, + "committed_offset": -1001, + "xmit_msgq_bytes": 0, + "xmit_msgq_cnt": 0, + "msgq_bytes": 0, + "msgq_cnt": 0, + "unknown": false, + "desired": true, + "leader": 0, + "partition": 0, + "fetchq_cnt": 30582, + "fetchq_size": 6224678, + "fetch_state": "active", + "query_offset": 0, + "next_offset": 34784, + "app_offset": 6143, + "stored_offset": 6143, + "commited_offset": -1001 + } + }, + "metadata_age": 5995, + "topic": "test4" + } + }, + + + "brokers": { + "mediawiki-vagrant.dev:9092/0": { + "toppars": { + "test4": { + "partition": 0, + "topic": "test4" + } + }, + "throttle": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rtt": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rxpartial": 3, + "rxcorriderrs": 0, + "waitresp_msg_cnt": 0, + "waitresp_cnt": 1, + "outbuf_msg_cnt": 0, + "outbuf_cnt": 0, + "stateage": 7023265, + "state": "UP", + "nodeid": 0, + "name": "mediawiki-vagrant.dev:9092/0", + "tx": 57, + "txbytes": 3609, + "txerrs": 0, + "txretries": 0, + "req_timeouts": 0, + "rx": 56, + "rxbytes": 3406226, + "rxerrs": 0 + }, + "localhost:9092/bootstrap": { + "toppars": {}, + "throttle": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rtt": { + "cnt": 0, + "sum": 0, + "avg": 0, + "max": 0, + "min": 0 + }, + "rxpartial": 0, + "rxcorriderrs": 0, + "waitresp_msg_cnt": 0, + "waitresp_cnt": 0, + "outbuf_msg_cnt": 0, + "outbuf_cnt": 0, + "stateage": 7058566, + "state": "UP", + "nodeid": -1, + "name": "localhost:9092/bootstrap", + "tx": 5, + "txbytes": 172, + "txerrs": 0, + "txretries": 0, + "req_timeouts": 0, + "rx": 5, + "rxbytes": 8146, + "rxerrs": 0 + } + }, + + "name": "rdkafka#consumer-1", + "type": "consumer", + "ts": 69158506047, + "time": 1472147268, + "replyq": 30583, + "msg_cnt": 0, + "msg_max": 100000, + "simple_cnt": 0 + }; + + var rawObject = { + "message": JSON.stringify(metrics) + }; + + var flat_metrics = cb(rawObject); + + + assert.deepEqual(flat_metrics['simple_cnt'], undefined); + assert.deepEqual(flat_metrics['topics.test4.partitions.0.rx_ver_drops'], undefined); + assert.deepEqual(flat_metrics['topics.test4.partitions.-1.lo_offset'], undefined); + assert.deepEqual(flat_metrics['topics.test4.partitions.0.stored_offset'], 6143); + + }); + +}); +describe('Testing object reporting', function () { + + it('Testing object reporting happy case', function (done) { + // given that code execution is synchronous these assertions are guranteed + // to run + const fakeStatsdReporter = { + gauge: function (item, itemData) { + assert.equal(itemData, 1001); + } + }; + const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, { + 'filterFn': false + }); + + var a = { + "topics": { + "rx_ver_drops": [], + "msgs": 1001, + "txmsgs": "this is a string" + + } + }; + + var rawObject = { + "message": JSON.stringify(a) + }; + var flat_a = cb(rawObject); + done(); + }); + + it('Testing object reporting bad metrics, should not report', function (done) { + // given that code execution is synchronous these assertions are guranteed + // to run if the reporter is called + // it should not be as the only metric we have is neither a + // string nor a number + const fakeStatsdReporter = { + + gauge: function (item, itemData) { + assert.equal(true, false); + } + }; + const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, { + 'filterFn': false + }); + + var a = { + "topics": { + "rx_ver_drops": [], + + } + }; + var rawObject = { + "message": JSON.stringify(a) + }; + var flat_a = cb(rawObject); + done(); + }); + + it('Testing object reporting, removing chars ', function (done) { + // given that code execution is synchronous these assertions are guranteed + + const fakeStatsdReporter = { + + gauge: function (item, itemData) { + assert.equal(item, 'topics_rx_ver_drops'); + } + }; + const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, { + 'filterFn': false + }); + + var a = { + "topics": { + "rx.ver/drops": 43, + + } + }; + var rawObject = { + "message": JSON.stringify(a) + }; + var flat_a = cb(rawObject); + done(); + }); + +}); -- To view, visit https://gerrit.wikimedia.org/r/319671 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I15d8a2f93b34bfafa5c80b191b416e780f33c515 Gerrit-PatchSet: 22 Gerrit-Project: node-rdkafka-statsd Gerrit-Branch: master Gerrit-Owner: Nuria <nu...@wikimedia.org> Gerrit-Reviewer: Nuria <nu...@wikimedia.org> Gerrit-Reviewer: Ottomata <o...@wikimedia.org> Gerrit-Reviewer: Ppchelko <ppche...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits