Ottomata has submitted this change and it was merged.

Change subject: node-rdkafka event.stats callback that reports stats to statsd
......................................................................


node-rdkafka event.stats callback that reports stats to statsd

Node module that parses an object structure, flattens it
and sends the whitelisted metrics to statsd

Depends implicitily on statsd client

Bug: T145099

Change-Id: I15d8a2f93b34bfafa5c80b191b416e780f33c515
---
A .gitignore
A .jshintignore
A .jshintrc
A .travis.yml
A LICENSE
A README.md
A lib/rdkafka-statsd.js
A package.json
A test/index.js
A test/test.js
10 files changed, 1,137 insertions(+), 0 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..392ef11
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,22 @@
+# Logs
+logs
+*.log
+npm-debug.log*
+
+# Dependency directories
+node_modules
+
+# Optional npm cache directory
+.npm
+
+# Optional REPL history
+.node_repl_history
+
+# IntelliJ IDEA
+.idea/*
+
+# Sublime
+*.sublime*
+
+# JSDoc results
+out/
diff --git a/.jshintignore b/.jshintignore
new file mode 100644
index 0000000..1c69eee
--- /dev/null
+++ b/.jshintignore
@@ -0,0 +1,3 @@
+coverage
+node_modules
+test
diff --git a/.jshintrc b/.jshintrc
new file mode 100644
index 0000000..283025c
--- /dev/null
+++ b/.jshintrc
@@ -0,0 +1,33 @@
+{
+  "predef": [
+    "setImmediate",
+    "Map",
+    "Set",
+    "describe",
+    "it"
+  ],
+
+  "bitwise": true,
+  "laxbreak": true,
+  "curly": true,
+  "eqeqeq": true,
+  "immed": true,
+  "latedef": "nofunc",
+  "newcap": true,
+  "noarg": true,
+  "noempty": true,
+  "nonew": true,
+  "regexp": false,
+  "undef": true,
+  "strict": true,
+  "trailing": true,
+
+  "smarttabs": true,
+  "multistr": true,
+
+  "node": true,
+
+  "nomen": false,
+  "loopfunc": true,
+  "esnext": true
+}
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..7ee6233
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,18 @@
+language: node_js
+node_js:
+    - "0.10"
+    - "0.12"
+    - "4.2"
+    - "5"
+
+sudo: false
+
+notifications:
+  irc:
+    channels:
+      - "irc.freenode.org#wikimedia-analytics"
+    on_success: change
+    on_failure: always
+
+script: npm test
+
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a016d69
--- /dev/null
+++ b/README.md
@@ -0,0 +1,2 @@
+Utility module to flatten an object, filter keys and send wanted metrics to 
statsd.
+By default metrics that include '-1' or 'toppars' are not sent.
diff --git a/lib/rdkafka-statsd.js b/lib/rdkafka-statsd.js
new file mode 100644
index 0000000..c29a3ed
--- /dev/null
+++ b/lib/rdkafka-statsd.js
@@ -0,0 +1,266 @@
+/**
+ * Parses an object structure, flattens it
+ * and sends the whitelisted metrics to statsd
+ * Depends implicitily on statsd client
+ * Use like:
+
+    var StatsD  require('node-statsd'),
+    client  new StatsD({ host: 'statsd.eqiad.wmnet'});  // or metrics-reporter
+
+    function myFilter(key) = {...}
+
+    var rdkafkaStatsdCb = require('node-rdkafka-statsd')(client,{'filterFn': 
myFilter});
+
+    var kafka = require('node-rdkafka');
+    var consumer = new kafka.KafkaConsumer({
+        ...
+        'statistics.interval.ms': 30000,
+    });
+    // Flowing mode
+    consumer.connect();
+    consumer
+        .on('ready', function() {
+            consumer.consume('some-topic');
+        })
+    .on('event.stats', rdkafka_statsd_cb);
+ *
+ *
+ * If filterFn is set to false no whitelist is applied
+ * metrics that include 'toppars' or '-1' are filtered by default
+ **/
+
+'use strict';
+
+
+
+const DOT = '.';
+
+/**
+ * Default filter function, can be overriden when instantiating module
+ **/
+function defaultFilterFn(key) {
+    const defaultWhitelist = ['hi_offset',
+        'lo_offset',
+        'eof_offset',
+        'committed_offset',
+        'query_offset',
+        'next_offset',
+        'app_offset',
+        'stored_offset',
+
+    ];
+
+
+    return defaultWhitelist.indexOf(key) >= 0;
+}
+
+
+function isNil(item) {
+    // catching both null and undefined, jshint doesn't let us do ==
+    return item === null || item === undefined;
+}
+
+function isArray(obj) {
+    return !isNil(obj) && Array.isArray(obj);
+}
+
+
+function isObject(obj) {
+    return !isNil(obj) && obj.constructor === Object;
+}
+
+function isFunction(obj) {
+    return !isNil(obj) && obj.constructor === Function;
+}
+
+// we assume that if it is not another object or array
+// it is a primitive type
+// obviously not true in javascript at large but true for this module usage
+function isPrimitive(item) {
+    return !isObject(item) && !isArray(item);
+}
+
+function isString(item) {
+    // gotcha string objects need to be converted to string
+    // primitive types, valueof does that
+    return typeof item.valueOf() === "string";
+
+}
+
+function isNumber(item) {
+    var amount = Number(item);
+    return !Number.isNaN(amount) && Number.isFinite(amount);
+
+}
+
+function cleanUnwantedChars(str) {
+    const UNDERSCORE = '_';
+    return str.replace(/:|\.|;|\//g, UNDERSCORE);
+}
+
+/**
+ * Recurses through objects and flattens them
+ * into a single level dict of key: value pairs.  Each
+ * key consists of all of the recursed keys joined by
+ * separator that is hardcoded to be a dot.
+ *
+ **/
+function flatten(obj) {
+
+    var dict = {};
+
+    // return empty object, raising an error seems like
+    // it would polute the logs
+    if (!isObject(obj) && !isArray(obj)) {
+        return dict;
+    }
+
+    // looks like metric obj is really sent as a string
+    if ('message' in obj) {
+        obj = JSON.parse(obj.message);
+    }
+
+
+    function _flatten(obj, dict, keyPrefix) {
+
+        // object has -ad minimum - a key, value pair
+        // {'some-key': 'some-value-maybe-another-object'}
+        // decide if object or array
+
+        Object.keys(obj).forEach(function (name) {
+            var keyName;
+
+            var cleanName = cleanUnwantedChars(name);
+
+            if (keyPrefix) {
+                keyName = keyPrefix + DOT + cleanName;
+            } else {
+                keyName = cleanName;
+            }
+
+            if (isPrimitive(obj[name])) {
+                dict[keyName] = obj[name];
+            } else {
+                // continue recursing
+                _flatten(obj[name], dict, keyName);
+
+            }
+
+        });
+
+
+        return dict;
+    }
+    return _flatten(obj, dict, null);
+}
+
+/**
+ * Removes unwanted keys from data object
+ * Object must be fully built as the keys you are interested on might be deep
+ * in the structure.
+ *
+ */
+function filterKeys(data, filterFn) {
+
+    var filteredData = {};
+
+    // these values are nonsensical and always filtered regardless
+    // of passed in filter function
+    const blacklist = ['toppars', '-1'];
+
+
+    Object.keys(data).forEach(function (name) {
+        // split property by '.'
+        var items = name.split(DOT);
+        //check blacklist, takes precedence
+        var blacklisted = false;
+        for (var j = 0; j < items.length; j++) {
+            if (blacklist.indexOf(items[j]) >= 0) {
+                blacklisted = true;
+                break;
+            }
+        }
+        // if any of the items appears in filterFn keep property on object
+        // unless this key is alredy blacklisted
+        for (var i = 0; !blacklisted && i < items.length; i++) {
+
+            if (filterFn(items[i])) {
+                filteredData[name] = data[name];
+                break;
+            }
+        }
+
+    });
+
+    return filteredData;
+}
+
+/**
+ * Reports metrics to statsd using reporter
+ * returns data for convenience, but not really needed
+ * Please take a look at 
https://blog.pkhamre.com/understanding-statsd-and-graphite/
+ * to learn about graphite metrics.
+ *
+ * Gauge metrics represent a 'an arbitrary (numeric) value at a point in time'
+ * and this is what kafka metrics will be most often.
+ *
+ **/
+function report(data, reporter) {
+
+    Object.keys(data).forEach(function (metric) {
+        if (isNumber(data[metric])) {
+
+            try {
+                reporter.gauge(metric, data[metric]);
+            } catch (e) {
+                //nothing you can do
+            }
+        }
+
+    });
+    return data;
+}
+
+/**
+ * Returns a function that parses an object and sents its key/value pairs as 
metric/value
+ * to graphite.
+ *
+ * Given a raw object it will be traversed, and each leaf node of the object 
will
+ * be keyed by a concatenated key made up of all parent keys.
+ * Keys are the name of metrics that are sent to graphite
+ * Clients have the option of filtering metrics and only sent the ones they 
care about
+ * by supplying a filtering function that given a string (metric name) returns 
true or false
+ * A default whitelist is provided
+ *
+ * If filterFn === false no whitelist is applied
+ *
+ **/
+
+function createRdKafkaStatsCb(reporter, options) {
+    var optionalWhitelistFn;
+
+    options = options || {};
+
+
+    // use default filtering if none is passed in
+    if (!('filterFn' in options)) {
+        options.filterFn = defaultFilterFn;
+    } else if (options.filterFn === false) {
+        // no filtering
+        return function (rawObject) {
+            return report(flatten(rawObject), reporter);
+
+        };
+    } else if (!isFunction(options.filterFn)) {
+        throw new Error('Must provide a function for options.filterFn');
+    }
+
+
+    return function (rawObject) {
+        return report(filterKeys(flatten(rawObject), options.filterFn), 
reporter);
+    };
+
+}
+
+/************** Module exports ***********************/
+module.exports = createRdKafkaStatsCb;
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..aa68c97
--- /dev/null
+++ b/package.json
@@ -0,0 +1,39 @@
+{
+  "name": "node-rdkafka-statsd",
+  "version": "0.1.0",
+  "description": "node-rdkafka event.stats callback that sends rdkafka 
statistics to statsd.",
+  "main": "lib/rdkafka-statsd.js",
+  "scripts": {
+    "test": "mocha"
+  },
+  "repository": {
+    "type": "git",
+    "url": "git://gerrit.wikimedia.org/r//node-rdkafka-statsd.git"
+  },
+  "keywords": [
+    "analytics",
+    "kafka",
+    "statsd"
+  ],
+  "author": "Wikimedia Analytics Team <analyt...@wikimedia.org>",
+  "license": "Apache-2.0",
+  "bugs": {
+    "url": "https://phabricator.wikimedia.org/tag/analytics/";
+  },
+  "dependencies": {},
+  "devDependencies": {
+    "js-yaml": "^3.5.2",
+    "bunyan": "^1.5.1",
+    "coveralls": "^2.11.6",
+    "istanbul": "^0.4.2",
+    "mocha": "^2.5.3",
+    "mocha-jshint": "^2.3.1",
+    "mocha-lcov-reporter": "^1.0.0"
+  },
+  "deploy": {
+    "node": "4.4.6",
+    "dependencies": {
+      "_all": []
+    }
+  }
+}
diff --git a/test/index.js b/test/index.js
new file mode 100644
index 0000000..deb30a0
--- /dev/null
+++ b/test/index.js
@@ -0,0 +1,3 @@
+'use strict'; +
+ // Run jshint as part of normal testing
+require('mocha-jshint')(); 
diff --git a/test/test.js b/test/test.js
new file mode 100644
index 0000000..f180e76
--- /dev/null
+++ b/test/test.js
@@ -0,0 +1,549 @@
+"use strict";
+
+const assert = require('assert');
+
+
+describe('Testing object flattening. No filter function (Blackbox, through 
exposed function)', function () {
+
+    const fakeStatsdReporter = {
+        set: function () {},
+        gauge: function () {}
+    };
+    const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, {
+        'filterFn': false
+    });
+
+    it('Testing object flattening happy case', function () {
+
+        var a = {
+            'pepe': {
+                'pepe': 45,
+                'ana': 23,
+                'juanito': 45,
+                'bad:key': 'bad!',
+            },
+            'ana': 63,
+            'juanito': 45,
+            'bad:ana': 'hola!'
+
+
+        };
+
+        var rawObject = {
+            "message": JSON.stringify(a)
+        };
+
+        var flat_a = cb(rawObject);
+        assert.deepEqual(flat_a.ana, 63);
+        assert.deepEqual(flat_a['pepe.ana'], 23);
+        assert.deepEqual(flat_a['juanito'], 45);
+        //':' substitution happened
+        assert.deepEqual(flat_a['bad_ana'], 'hola!');
+        assert.deepEqual(flat_a['pepe.bad_key'], 'bad!');
+
+
+    });
+
+});
+
+describe('Testing object flattening and filtering with passed-in filter 
function (Blackbox, through exposed function)', function () {
+
+    const fakeFilterFn = function (key) {
+        var l = ['ana', 'simple_cnt', 'rx_ver_drops', 'lo_offset'];
+        return l.indexOf(key) >= 0;
+
+    };
+
+    const fakeStatsdReporter = {
+        gauge: function () {}
+    };
+    const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, {
+        'filterFn': fakeFilterFn
+    });
+
+    it('Testing object flattening happy case', function () {
+
+        var a = {
+            'pepe': {
+                'pepe': 45,
+                'ana': 23,
+                'juanito': 45
+            },
+            'ana': 63,
+            'juanito': 45
+        };
+
+
+        var rawObject = {
+            "message": JSON.stringify(a)
+        };
+
+        var flat_a = cb(rawObject);
+        assert.deepEqual(flat_a.ana, 63);
+        assert.deepEqual(flat_a['pepe.ana'], 23);
+        assert(flat_a.juanito === undefined);
+
+    });
+
+    it('Testing object flattening empty object', function () {
+
+        var a = {};
+        var flat_a = cb(a);
+        assert.deepEqual(flat_a, {});
+
+    });
+
+    it('Testing object flattening no object', function () {
+
+        var a = "hola";
+        var flat_a = cb(a);
+        assert.deepEqual(flat_a, {});
+
+    });
+
+
+    it('Testing object flattening null object', function () {
+
+        var a = null;
+        var flat_a = cb(a);
+        assert.deepEqual(flat_a, {});
+
+    });
+
+
+    it('Test flattening, real kafka object', function () {
+        var metrics = {
+            "topics": {
+                "test4": {
+                    "partitions": {
+                        "-1": {
+                            "rx_ver_drops": 0,
+                            "msgs": 0,
+                            "txbytes": 0,
+                            "txmsgs": 0,
+                            "consumer_lag": -1,
+                            "hi_offset": -1001,
+                            "lo_offset": -1001,
+                            "eof_offset": -1001,
+                            "committed_offset": -1001,
+                            "xmit_msgq_bytes": 0,
+                            "xmit_msgq_cnt": 0,
+                            "msgq_bytes": 0,
+                            "msgq_cnt": 0,
+                            "unknown": false,
+                            "desired": false,
+                            "leader": -1,
+                            "partition": -1,
+                            "fetchq_cnt": 0,
+                            "fetchq_size": 0,
+                            "fetch_state": "none",
+                            "query_offset": 0,
+                            "next_offset": 0,
+                            "app_offset": -1001,
+                            "stored_offset": -1001,
+                            "commited_offset": -1001
+                        },
+                        "0": {
+                            "rx_ver_drops": 0,
+                            "msgs": 0,
+                            "txbytes": 0,
+                            "txmsgs": 0,
+                            "consumer_lag": 0,
+                            "hi_offset": 34784,
+                            "lo_offset": -1001,
+                            "eof_offset": 34784,
+                            "committed_offset": -1001,
+                            "xmit_msgq_bytes": 0,
+                            "xmit_msgq_cnt": 0,
+                            "msgq_bytes": 0,
+                            "msgq_cnt": 0,
+                            "unknown": false,
+                            "desired": true,
+                            "leader": 0,
+                            "partition": 0,
+                            "fetchq_cnt": 30582,
+                            "fetchq_size": 6224678,
+                            "fetch_state": "active",
+                            "query_offset": 0,
+                            "next_offset": 34784,
+                            "app_offset": 6143,
+                            "stored_offset": 6143,
+                            "commited_offset": -1001
+                        }
+                    },
+                    "metadata_age": 5995,
+                    "topic": "test4"
+                }
+            },
+
+
+            "brokers": {
+                "mediawiki-vagrant.dev:9092/0": {
+                    "toppars": {
+                        "test4": {
+                            "partition": 0,
+                            "topic": "test4"
+                        }
+                    },
+                    "throttle": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rtt": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rxpartial": 3,
+                    "rxcorriderrs": 0,
+                    "waitresp_msg_cnt": 0,
+                    "waitresp_cnt": 1,
+                    "outbuf_msg_cnt": 0,
+                    "outbuf_cnt": 0,
+                    "stateage": 7023265,
+                    "state": "UP",
+                    "nodeid": 0,
+                    "name": "mediawiki-vagrant.dev:9092/0",
+                    "tx": 57,
+                    "txbytes": 3609,
+                    "txerrs": 0,
+                    "txretries": 0,
+                    "req_timeouts": 0,
+                    "rx": 56,
+                    "rxbytes": 3406226,
+                    "rxerrs": 0
+                },
+                "localhost:9092/bootstrap": {
+                    "toppars": {},
+                    "throttle": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rtt": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rxpartial": 0,
+                    "rxcorriderrs": 0,
+                    "waitresp_msg_cnt": 0,
+                    "waitresp_cnt": 0,
+                    "outbuf_msg_cnt": 0,
+                    "outbuf_cnt": 0,
+                    "stateage": 7058566,
+                    "state": "UP",
+                    "nodeid": -1,
+                    "name": "localhost:9092/bootstrap",
+                    "tx": 5,
+                    "txbytes": 172,
+                    "txerrs": 0,
+                    "txretries": 0,
+                    "req_timeouts": 0,
+                    "rx": 5,
+                    "rxbytes": 8146,
+                    "rxerrs": 0
+                }
+            },
+
+            "name": "rdkafka#consumer-1",
+            "type": "consumer",
+            "ts": 69158506047,
+            "time": 1472147268,
+            "replyq": 30583,
+            "msg_cnt": 0,
+            "msg_max": 100000,
+            "simple_cnt": 0
+        };
+
+
+        var rawObject = {
+            "message": JSON.stringify(metrics)
+        };
+
+        var flat_metrics = cb(rawObject);
+        assert.deepEqual(flat_metrics['simple_cnt'], 0);
+        
assert.deepEqual(flat_metrics['topics.test4.partitions.0.rx_ver_drops'], 0);
+        // '-1' is filtered
+        assert.deepEqual(flat_metrics['topics.test4.partitions.-1.lo_offset'], 
undefined);
+        assert.deepEqual(flat_metrics['topics.test4.partitions.0.lo_offset'], 
-1001);
+
+        assert(flat_metrics.msg_max === undefined);
+
+    });
+
+});
+
+
+describe('Testing object flattening and filtering with default filter function 
(Blackbox, through exposed function)', function () {
+
+    const fakeStatsdReporter = {
+        gauge: function () {}
+    };
+
+    const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter);
+
+
+    it('Test flattening and filtering default function, real kafka object', 
function () {
+        var metrics = {
+            "topics": {
+                "test4": {
+                    "partitions": {
+                        "-1": {
+                            "rx_ver_drops": 0,
+                            "msgs": 0,
+                            "txbytes": 0,
+                            "txmsgs": 0,
+                            "consumer_lag": -1,
+                            "hi_offset": -1001,
+                            "lo_offset": -1001,
+                            "eof_offset": -1001,
+                            "committed_offset": -1001,
+                            "xmit_msgq_bytes": 0,
+                            "xmit_msgq_cnt": 0,
+                            "msgq_bytes": 0,
+                            "msgq_cnt": 0,
+                            "unknown": false,
+                            "desired": false,
+                            "leader": -1,
+                            "partition": -1,
+                            "fetchq_cnt": 0,
+                            "fetchq_size": 0,
+                            "fetch_state": "none",
+                            "query_offset": 0,
+                            "next_offset": 0,
+                            "app_offset": -1001,
+                            "stored_offset": -1001,
+                            "commited_offset": -1001
+                        },
+                        "0": {
+                            "rx_ver_drops": 0,
+                            "msgs": 0,
+                            "txbytes": 0,
+                            "txmsgs": 0,
+                            "consumer_lag": 0,
+                            "hi_offset": 34784,
+                            "lo_offset": -1001,
+                            "eof_offset": 34784,
+                            "committed_offset": -1001,
+                            "xmit_msgq_bytes": 0,
+                            "xmit_msgq_cnt": 0,
+                            "msgq_bytes": 0,
+                            "msgq_cnt": 0,
+                            "unknown": false,
+                            "desired": true,
+                            "leader": 0,
+                            "partition": 0,
+                            "fetchq_cnt": 30582,
+                            "fetchq_size": 6224678,
+                            "fetch_state": "active",
+                            "query_offset": 0,
+                            "next_offset": 34784,
+                            "app_offset": 6143,
+                            "stored_offset": 6143,
+                            "commited_offset": -1001
+                        }
+                    },
+                    "metadata_age": 5995,
+                    "topic": "test4"
+                }
+            },
+
+
+            "brokers": {
+                "mediawiki-vagrant.dev:9092/0": {
+                    "toppars": {
+                        "test4": {
+                            "partition": 0,
+                            "topic": "test4"
+                        }
+                    },
+                    "throttle": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rtt": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rxpartial": 3,
+                    "rxcorriderrs": 0,
+                    "waitresp_msg_cnt": 0,
+                    "waitresp_cnt": 1,
+                    "outbuf_msg_cnt": 0,
+                    "outbuf_cnt": 0,
+                    "stateage": 7023265,
+                    "state": "UP",
+                    "nodeid": 0,
+                    "name": "mediawiki-vagrant.dev:9092/0",
+                    "tx": 57,
+                    "txbytes": 3609,
+                    "txerrs": 0,
+                    "txretries": 0,
+                    "req_timeouts": 0,
+                    "rx": 56,
+                    "rxbytes": 3406226,
+                    "rxerrs": 0
+                },
+                "localhost:9092/bootstrap": {
+                    "toppars": {},
+                    "throttle": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rtt": {
+                        "cnt": 0,
+                        "sum": 0,
+                        "avg": 0,
+                        "max": 0,
+                        "min": 0
+                    },
+                    "rxpartial": 0,
+                    "rxcorriderrs": 0,
+                    "waitresp_msg_cnt": 0,
+                    "waitresp_cnt": 0,
+                    "outbuf_msg_cnt": 0,
+                    "outbuf_cnt": 0,
+                    "stateage": 7058566,
+                    "state": "UP",
+                    "nodeid": -1,
+                    "name": "localhost:9092/bootstrap",
+                    "tx": 5,
+                    "txbytes": 172,
+                    "txerrs": 0,
+                    "txretries": 0,
+                    "req_timeouts": 0,
+                    "rx": 5,
+                    "rxbytes": 8146,
+                    "rxerrs": 0
+                }
+            },
+
+            "name": "rdkafka#consumer-1",
+            "type": "consumer",
+            "ts": 69158506047,
+            "time": 1472147268,
+            "replyq": 30583,
+            "msg_cnt": 0,
+            "msg_max": 100000,
+            "simple_cnt": 0
+        };
+
+        var rawObject = {
+            "message": JSON.stringify(metrics)
+        };
+
+        var flat_metrics = cb(rawObject);
+
+
+        assert.deepEqual(flat_metrics['simple_cnt'], undefined);
+        
assert.deepEqual(flat_metrics['topics.test4.partitions.0.rx_ver_drops'], 
undefined);
+        assert.deepEqual(flat_metrics['topics.test4.partitions.-1.lo_offset'], 
undefined);
+        
assert.deepEqual(flat_metrics['topics.test4.partitions.0.stored_offset'], 6143);
+
+    });
+
+});
+describe('Testing object reporting', function () {
+
+    it('Testing object reporting happy case', function (done) {
+        // given that code execution is synchronous these assertions are 
guranteed
+        // to run
+        const fakeStatsdReporter = {
+            gauge: function (item, itemData) {
+                assert.equal(itemData, 1001);
+            }
+        };
+        const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, {
+            'filterFn': false
+        });
+
+        var a = {
+            "topics": {
+                "rx_ver_drops": [],
+                "msgs": 1001,
+                "txmsgs": "this is a string"
+
+            }
+        };
+
+        var rawObject = {
+            "message": JSON.stringify(a)
+        };
+        var flat_a = cb(rawObject);
+        done();
+    });
+
+    it('Testing object reporting bad metrics, should not report', function 
(done) {
+        // given that code execution is synchronous these assertions are 
guranteed
+        // to run if the reporter is called
+        // it should not be as the only metric we have is neither a
+        // string nor a number
+        const fakeStatsdReporter = {
+
+            gauge: function (item, itemData) {
+                assert.equal(true, false);
+            }
+        };
+        const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, {
+            'filterFn': false
+        });
+
+        var a = {
+            "topics": {
+                "rx_ver_drops": [],
+
+            }
+        };
+        var rawObject = {
+            "message": JSON.stringify(a)
+        };
+        var flat_a = cb(rawObject);
+        done();
+    });
+
+    it('Testing object reporting, removing chars ', function (done) {
+        // given that code execution is synchronous these assertions are 
guranteed
+
+        const fakeStatsdReporter = {
+
+            gauge: function (item, itemData) {
+                assert.equal(item, 'topics_rx_ver_drops');
+            }
+        };
+        const cb = require('../lib/rdkafka-statsd')(fakeStatsdReporter, {
+            'filterFn': false
+        });
+
+        var a = {
+            "topics": {
+                "rx.ver/drops": 43,
+
+            }
+        };
+        var rawObject = {
+            "message": JSON.stringify(a)
+        };
+        var flat_a = cb(rawObject);
+        done();
+    });
+
+});

-- 
To view, visit https://gerrit.wikimedia.org/r/319671
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I15d8a2f93b34bfafa5c80b191b416e780f33c515
Gerrit-PatchSet: 22
Gerrit-Project: node-rdkafka-statsd
Gerrit-Branch: master
Gerrit-Owner: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Ottomata <o...@wikimedia.org>
Gerrit-Reviewer: Ppchelko <ppche...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to