Ppchelko has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/326169 )

Change subject: Set up foundation for integration testing with kafka.
......................................................................

Set up foundation for integration testing with kafka.

Pending CI support.

Change-Id: I1d7f28415f56e81436f522084b82d3cec3be52e6
---
M app.js
A config.test.yaml
M lib/edit-stream.js
M package.json
M test/features/app/app.js
M test/features/app/spec.js
M test/features/v1/trending.js
A test/utils/clean_kafka.sh
M test/utils/server.js
9 files changed, 227 insertions(+), 99 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits 
refs/changes/69/326169/1

diff --git a/app.js b/app.js
index 28b7746..e1cce87 100644
--- a/app.js
+++ b/app.js
@@ -207,7 +207,7 @@
         app.logger.log('error/edit_stream', 'Error in edit stream: ' + e);
     });
     process.on('exit', () => editStream.close());
-    return app;
+    return editStream.setup().thenReturn(app);
 }
 
 /**
diff --git a/config.test.yaml b/config.test.yaml
new file mode 100644
index 0000000..fb78809
--- /dev/null
+++ b/config.test.yaml
@@ -0,0 +1,41 @@
+num_workers: 0
+worker_heap_limit_mb: 250
+
+# Logger info
+logging:
+  name: trending-edits
+  level: info
+
+services:
+  - name: service-trending-edits
+    module: ./app.js
+    conf:
+      port: 6927
+      cors: '*'
+      user_agent: WMF Trending Edits
+      mwapi_req:
+        method: post
+        uri: https://{{domain}}/w/api.php
+        headers:
+          user-agent: '{{user-agent}}'
+        body: '{{ default(request.query, {}) }}'
+      restbase_req:
+        method: '{{request.method}}'
+        uri: https://{{domain}}/api/rest_v1/{+path}
+        query: '{{ default(request.query, {}) }}'
+        headers: '{{request.headers}}'
+        body: '{{request.body}}'
+      consume_dc: [ test_dc ]
+      purge_period: 100 # purge contents of store after this number of events
+      max_results: 20
+      purge_strategy:
+        max_pages: 5000 # maximum number of pages that can be stored at any 
given time
+        max_inactivity: 40 # maximum time in minutes a page can go without 
edits
+        max_age: 1 # maximum age allowed in minutes
+        min_speed: 0 # minimum speed in edits per minute that a page is kept 
around
+      min_edits: 6
+      broker_list: localhost:9092
+      consumer:
+        fetch.wait.max.ms: "1"
+        fetch.min.bytes: "1"
+        queue.buffering.max.ms: "1"
diff --git a/lib/edit-stream.js b/lib/edit-stream.js
index be695ff..29a71d8 100644
--- a/lib/edit-stream.js
+++ b/lib/edit-stream.js
@@ -4,6 +4,7 @@
 const EventEmitter = require('events').EventEmitter;
 const os = require('os');
 const TimeUUID = require('cassandra-uuid').TimeUuid;
+const P = require('bluebird');
 
 /**
  * Defines how often to commit the offset.
@@ -79,37 +80,13 @@
     constructor(options) {
         super();
         this._options = checkOptions(options || {});
-        this._consumer = new kafka.KafkaConsumer({
+        this._consumer = new kafka.KafkaConsumer(Object.assign({
             'group.id': getConsumerGroup(this._options.worker_id),
             'metadata.broker.list': this._options.broker_list,
             'enable.auto.commit': 'false',
             'client.id': TimeUUID.now()
-        }, {
+        }, this._options.consumer || {}), {
             'auto.offset.reset': 'largest'
-        });
-        this._consumer.connect();
-        this._consumer
-        .on('ready', () => {
-            this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => {
-                if (e) {
-                    return this.emit('error', e);
-                }
-
-                const pureTopic = 
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
-                const skip = COMMIT_PUSH_SKIP[pureTopic];
-                if (!skip || kafkaMessage.offset % skip === 0) {
-                    this._commitQueue.push({
-                        message: kafkaMessage,
-                        timestamp: Date.now()
-                    });
-                }
-
-                try {
-                    this.emit('edit', 
JSON.parse(kafkaMessage.value.toString()));
-                } catch (e) {
-                    this.emit('error', e);
-                }
-            });
         });
         this._commitQueue = [];
         this._commitInterval = setInterval(() => {
@@ -122,6 +99,37 @@
         }, COMMIT_INTERVAL);
     }
 
+    setup() {
+        return new P((resolve, reject) => {
+            this._consumer.connect(undefined, (err) => {
+                if (err) {
+                    return reject(err);
+                }
+                this._consumer.consume(this._consumeTopics, (e, kafkaMessage) 
=> {
+                    if (e) {
+                        return this.emit('error', e);
+                    }
+
+                    const pureTopic = 
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
+                    const skip = COMMIT_PUSH_SKIP[pureTopic];
+                    if (!skip || kafkaMessage.offset % skip === 0) {
+                        this._commitQueue.push({
+                            message: kafkaMessage,
+                            timestamp: Date.now()
+                        });
+                    }
+
+                    try {
+                        this.emit('edit', 
JSON.parse(kafkaMessage.value.toString()));
+                    } catch (e) {
+                        this.emit('error', e);
+                    }
+                });
+                resolve(this);
+            });
+        });
+    }
+
     /**
      * Returns an array of topic names to consume the 
`mediawiki.revision-create`,
      * `mediawiki.page-move` and `mediawiki.page-delete` events from
diff --git a/package.json b/package.json
index 8cc6087..51221d9 100644
--- a/package.json
+++ b/package.json
@@ -6,10 +6,14 @@
   "main": "./app.js",
   "scripts": {
     "start": "service-runner",
-    "test": "mocha && nsp check",
+    "cleanup": "sh test/utils/clean_kafka.sh",
+    "install-kafka": "sh node_modules/kafka-test-tools/install_kafka.sh",
+    "start-kafka": "sh node_modules/kafka-test-tools/start_kafka.sh start",
+    "stop-kafka": "sh node_modules/kafka-test-tools/start_kafka.sh stop",
+    "test": "npm run cleanup && mocha && nsp check",
     "docker-start": "service-runner docker-start",
     "docker-test": "service-runner docker-test",
-    "coverage": "istanbul cover _mocha -- -R spec"
+    "coverage": "npm run cleanup && istanbul cover _mocha -- -R spec"
   },
   "repository": {
     "type": "git",
@@ -43,7 +47,8 @@
     "mocha-jscs": "^5.0.1",
     "mocha-jshint": "^2.3.1",
     "mocha-lcov-reporter": "^1.2.0",
-    "nsp": "^2.6.1"
+    "nsp": "^2.6.1",
+    "kafka-test-tools": "^0.1.0"
   },
   "deploy": {
     "node": "4.6.0",
diff --git a/test/features/app/app.js b/test/features/app/app.js
index d30b94e..9d36e53 100644
--- a/test/features/app/app.js
+++ b/test/features/app/app.js
@@ -3,12 +3,14 @@
 
 var preq   = require('preq');
 var assert = require('../../utils/assert.js');
-var server = require('../../utils/server.js');
+var Server = require('../../utils/server.js');
 
 
 describe('express app', function() {
 
     this.timeout(20000);
+
+    const server = new Server('config.dev.yaml');
 
     before(function () { return server.start(); });
 
@@ -76,5 +78,7 @@
         });
     });
 
+    after(() => server.stop());
+
 });
 
diff --git a/test/features/app/spec.js b/test/features/app/spec.js
index 570fb40..e9f481e 100644
--- a/test/features/app/spec.js
+++ b/test/features/app/spec.js
@@ -3,11 +3,13 @@
 
 var preq   = require('preq');
 var assert = require('../../utils/assert.js');
-var server = require('../../utils/server.js');
+var Server = require('../../utils/server.js');
 var URI    = require('swagger-router').URI;
 var yaml   = require('js-yaml');
 var fs     = require('fs');
 
+
+const server = new Server('config.dev.yaml');
 
 function staticSpecLoad() {
 
@@ -281,5 +283,7 @@
 
     });
 
+    after(() => server.stop());
+
 });
 
diff --git a/test/features/v1/trending.js b/test/features/v1/trending.js
index abadc86..96b48ea 100644
--- a/test/features/v1/trending.js
+++ b/test/features/v1/trending.js
@@ -3,20 +3,69 @@
 
 const preq = require('preq');
 const assert = require('../../utils/assert.js');
-const server = require('../../utils/server.js');
+const Server = require('../../utils/server.js');
 const processor = require('../../../lib/processor.js');
 const fixtures = require('../../utils/fixtures');
+const Producer = require('node-rdkafka').Producer;
+const P = require('bluebird');
 
 describe('trending', function() {
 
     this.timeout(20000);
 
+    const server = new Server('config.test.yaml');
+    let producer;
+
     before(function () {
-        processor.reset();
-        return server.start();
+        return server.start()
+        .then(() => {
+            return new P((resolve, reject) => {
+                const producer = new Producer(
+                    {
+                        'metadata.broker.list': 'localhost:9092'
+                    },
+                    {},
+                    (() => {})
+                );
+                producer.once('error', reject);
+                producer.connect(undefined, (err) => {
+                    if (err) {
+                        return reject(err);
+                    }
+                    return resolve(producer);
+                });
+            });
+        })
+        .then((p) => {
+            producer = p;
+        });
+    });
+
+    it('Should consume from kafka and create trendings', () => {
+        const msg = Buffer.from(JSON.stringify(fixtures.edit));
+        for (let i = 0; i < 100; i++) {
+            producer.produce('test_dc.mediawiki.revision-create', 0, msg);
+        }
+
+        return P.delay(500)
+        .then(() => preq.get({
+            uri: server.config.uri + 'en.wikipedia.org/v1/feed/trending-edits/'
+        }))
+        .then((res) => {
+            assert.deepEqual(res.status, 200);
+            assert.deepEqual(res.body.pages.length, 1);
+            assert.deepEqual(res.body.pages[0], {
+                totalEdits: 100,
+                trendiness: -1,
+                isNew: false,
+                updated: '2016-12-08T18:48:56.000Z',
+                $merge: [ 
'https://en.wikipedia.org/api/rest_v1/page/summary/Exploding_whale' ]
+            })
+        });
     });
 
     it('check response format', function() {
+        processor.reset();
         for (let i = 0; i < 1000; i++) {
             processor.process(fixtures.edit);
         }
@@ -53,5 +102,11 @@
             assert.ok(err.body.detail !== undefined, 'Error present in 
result');
         });
     });
+
+
+    after(() => {
+        server.stop();
+        producer.disconnect();
+    });
 });
 
diff --git a/test/utils/clean_kafka.sh b/test/utils/clean_kafka.sh
new file mode 100755
index 0000000..1405d25
--- /dev/null
+++ b/test/utils/clean_kafka.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+. $(cd $(dirname $0) && pwd)/../../node_modules/kafka-test-tools/clean_kafka.sh
+
+check 2181 "Zookeeper"
+check 9092 "Kafka"
+
+# Don't need to clean anything in Jenkins or Travis
+if [ "x$JENKINS_URL" = "x"  ] || [ "$CI" = "true" ]; then
+  dropTopics "test_dc"
+  wait
+  sleep 5
+fi
+
+createTopic "test_dc.mediawiki.revision-create"
+createTopic "test_dc.mediawiki.page-move"
+createTopic "test_dc.mediawiki.page-delete"
+
+wait
+sleep 5
diff --git a/test/utils/server.js b/test/utils/server.js
index ba0d718..906d852 100644
--- a/test/utils/server.js
+++ b/test/utils/server.js
@@ -1,73 +1,64 @@
 'use strict';
 
+const ServiceRunner = require('service-runner');
+const fs            = require('fs');
+const yaml          = require('js-yaml');
+const P             = require('bluebird');
+const preq          = require('preq');
 
-// mocha defines to avoid JSHint breakage
-/* global describe, it, before, beforeEach, after, afterEach */
+const SERVICE_STOP_DELAY = 500;
 
+let startupRetryLimit = 3;
 
-var BBPromise = require('bluebird');
-var ServiceRunner = require('service-runner');
-var logStream = require('./logStream');
-var fs = require('fs');
-var assert = require('./assert');
-var yaml = require('js-yaml');
-var extend = require('extend');
-
-
-// set up the configuration
-var config = {
-    conf: yaml.safeLoad(fs.readFileSync(__dirname + '/../../config.yaml'))
-};
-// build the API endpoint URI by supposing the actual service
-// is the last one in the 'services' list in the config file
-var myServiceIdx = config.conf.services.length - 1;
-var myService = config.conf.services[myServiceIdx];
-config.uri = 'http://localhost:' + myService.conf.port + '/';
-config.service = myService;
-// no forking, run just one process when testing
-config.conf.num_workers = 0;
-// have a separate, in-memory logger only
-config.conf.logging = {
-    name: 'test-log',
-    level: 'trace',
-    stream: logStream()
-};
-// make a deep copy of it for later reference
-var origConfig = extend(true, {}, config);
-
-var stop = function() { return BBPromise.resolve(); };
-var options = null;
-var runner = new ServiceRunner();
-
-
-function start(_options) {
-
-    _options = _options || {};
-
-    if (!assert.isDeepEqual(options, _options)) {
-        console.log('server options changed; restarting');
-        return stop().then(function() {
-            options = _options;
-            // set up the config
-            config = extend(true, {}, origConfig);
-            extend(true, config.conf.services[myServiceIdx].conf, options);
-            return runner.start(config.conf)
-            .then(function() {
-                stop = function () {
-                    console.log('stopping test server');
-                    return runner.stop().then(function() {
-                        stop = function() { return BBPromise.resolve(); };
-                    });
-                };
-                return true;
-            });
-        });
-    } else {
-        return BBPromise.resolve();
+class TrendingService {
+    constructor(configPath) {
+        this._configPath = configPath;
+        this.config = {
+            conf: this._loadConfig()
+        };
+        this.config.conf.num_workers = 0;
+        this.config.conf.logging = {
+            name: 'trending-edits',
+            level: 'fatal',
+            streams: [{type: 'stdout'}]
+        };
+        this.config.service = this.config.conf.services[0];
+        this.config.uri = `http://localhost:${this.config.service.conf.port}/`;
+        this._runner = new ServiceRunner();
+        this._running = false;
     }
 
+    _loadConfig() {
+        return yaml.safeLoad(fs.readFileSync(this._configPath).toString());
+    }
+
+    start() {
+        if (this._running) {
+            console.log('The test server is already running. Skipping start.')
+            return P.resolve();
+        }
+
+        return this._runner.start(this.config.conf)
+        .tap(() => this._running = true)
+        .delay(5000)
+        .catch((e) => {
+            if (startupRetryLimit > 0 && /EADDRINUSE/.test(e.message)) {
+                console.log('Execution of the previous test might have not 
finished yet. Retry startup');
+                startupRetryLimit--;
+                return P.delay(1000).then(() => this.start());
+            }
+            throw e;
+        });
+    }
+
+    stop() {
+        if (this._running) {
+            return this._runner.stop()
+            .tap(() => this._running = false)
+            .delay(SERVICE_STOP_DELAY);
+        }
+        return P.resolve();
+    }
 }
 
-module.exports.config = config;
-module.exports.start  = start;
-
+module.exports = TrendingService;

-- 
To view, visit https://gerrit.wikimedia.org/r/326169
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1d7f28415f56e81436f522084b82d3cec3be52e6
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Ppchelko <ppche...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to