Ppchelko has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/326169 )
Change subject: Set up foundation for integration testing with kafka. ...................................................................... Set up foundation for integration testing with kafka. Pending CI support. Change-Id: I1d7f28415f56e81436f522084b82d3cec3be52e6 --- M app.js A config.test.yaml M lib/edit-stream.js M package.json M test/features/app/app.js M test/features/app/spec.js M test/features/v1/trending.js A test/utils/clean_kafka.sh M test/utils/server.js 9 files changed, 227 insertions(+), 99 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits refs/changes/69/326169/1 diff --git a/app.js b/app.js index 28b7746..e1cce87 100644 --- a/app.js +++ b/app.js @@ -207,7 +207,7 @@ app.logger.log('error/edit_stream', 'Error in edit stream: ' + e); }); process.on('exit', () => editStream.close()); - return app; + return editStream.setup().thenReturn(app); } /** diff --git a/config.test.yaml b/config.test.yaml new file mode 100644 index 0000000..fb78809 --- /dev/null +++ b/config.test.yaml @@ -0,0 +1,41 @@ +num_workers: 0 +worker_heap_limit_mb: 250 + +# Logger info +logging: + name: trending-edits + level: info + +services: + - name: service-trending-edits + module: ./app.js + conf: + port: 6927 + cors: '*' + user_agent: WMF Trending Edits + mwapi_req: + method: post + uri: https://{{domain}}/w/api.php + headers: + user-agent: '{{user-agent}}' + body: '{{ default(request.query, {}) }}' + restbase_req: + method: '{{request.method}}' + uri: https://{{domain}}/api/rest_v1/{+path} + query: '{{ default(request.query, {}) }}' + headers: '{{request.headers}}' + body: '{{request.body}}' + consume_dc: [ test_dc ] + purge_period: 100 # purge contents of store after this number of events + max_results: 20 + purge_strategy: + max_pages: 5000 # maximum number of pages that can be stored at any given time + max_inactivity: 40 # maximum time in minutes a page can go without edits + max_age: 1 # maximum age allowed in minutes + min_speed: 0 # minimum speed in edits per minute that a page is kept around + min_edits: 6 + broker_list: localhost:9092 + consumer: + fetch.wait.max.ms: "1" + fetch.min.bytes: "1" + queue.buffering.max.ms: "1" diff --git a/lib/edit-stream.js b/lib/edit-stream.js index be695ff..29a71d8 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -4,6 +4,7 @@ const EventEmitter = require('events').EventEmitter; const os = require('os'); const TimeUUID = require('cassandra-uuid').TimeUuid; +const P = require('bluebird'); /** * Defines how often to commit the offset. @@ -79,37 +80,13 @@ constructor(options) { super(); this._options = checkOptions(options || {}); - this._consumer = new kafka.KafkaConsumer({ + this._consumer = new kafka.KafkaConsumer(Object.assign({ 'group.id': getConsumerGroup(this._options.worker_id), 'metadata.broker.list': this._options.broker_list, 'enable.auto.commit': 'false', 'client.id': TimeUUID.now() - }, { + }, this._options.consumer || {}), { 'auto.offset.reset': 'largest' - }); - this._consumer.connect(); - this._consumer - .on('ready', () => { - this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => { - if (e) { - return this.emit('error', e); - } - - const pureTopic = kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1); - const skip = COMMIT_PUSH_SKIP[pureTopic]; - if (!skip || kafkaMessage.offset % skip === 0) { - this._commitQueue.push({ - message: kafkaMessage, - timestamp: Date.now() - }); - } - - try { - this.emit('edit', JSON.parse(kafkaMessage.value.toString())); - } catch (e) { - this.emit('error', e); - } - }); }); this._commitQueue = []; this._commitInterval = setInterval(() => { @@ -122,6 +99,37 @@ }, COMMIT_INTERVAL); } + setup() { + return new P((resolve, reject) => { + this._consumer.connect(undefined, (err) => { + if (err) { + return reject(err); + } + this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => { + if (e) { + return this.emit('error', e); + } + + const pureTopic = kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1); + const skip = COMMIT_PUSH_SKIP[pureTopic]; + if (!skip || kafkaMessage.offset % skip === 0) { + this._commitQueue.push({ + message: kafkaMessage, + timestamp: Date.now() + }); + } + + try { + this.emit('edit', JSON.parse(kafkaMessage.value.toString())); + } catch (e) { + this.emit('error', e); + } + }); + resolve(this); + }); + }); + } + /** * Returns an array of topic names to consume the `mediawiki.revision-create`, * `mediawiki.page-move` and `mediawiki.page-delete` events from diff --git a/package.json b/package.json index 8cc6087..51221d9 100644 --- a/package.json +++ b/package.json @@ -6,10 +6,14 @@ "main": "./app.js", "scripts": { "start": "service-runner", - "test": "mocha && nsp check", + "cleanup": "sh test/utils/clean_kafka.sh", + "install-kafka": "sh node_modules/kafka-test-tools/install_kafka.sh", + "start-kafka": "sh node_modules/kafka-test-tools/start_kafka.sh start", + "stop-kafka": "sh node_modules/kafka-test-tools/start_kafka.sh stop", + "test": "npm run cleanup && mocha && nsp check", "docker-start": "service-runner docker-start", "docker-test": "service-runner docker-test", - "coverage": "istanbul cover _mocha -- -R spec" + "coverage": "npm run cleanup && istanbul cover _mocha -- -R spec" }, "repository": { "type": "git", @@ -43,7 +47,8 @@ "mocha-jscs": "^5.0.1", "mocha-jshint": "^2.3.1", "mocha-lcov-reporter": "^1.2.0", - "nsp": "^2.6.1" + "nsp": "^2.6.1", + "kafka-test-tools": "^0.1.0" }, "deploy": { "node": "4.6.0", diff --git a/test/features/app/app.js b/test/features/app/app.js index d30b94e..9d36e53 100644 --- a/test/features/app/app.js +++ b/test/features/app/app.js @@ -3,12 +3,14 @@ var preq = require('preq'); var assert = require('../../utils/assert.js'); -var server = require('../../utils/server.js'); +var Server = require('../../utils/server.js'); describe('express app', function() { this.timeout(20000); + + const server = new Server('config.dev.yaml'); before(function () { return server.start(); }); @@ -76,5 +78,7 @@ }); }); + after(() => server.stop()); + }); diff --git a/test/features/app/spec.js b/test/features/app/spec.js index 570fb40..e9f481e 100644 --- a/test/features/app/spec.js +++ b/test/features/app/spec.js @@ -3,11 +3,13 @@ var preq = require('preq'); var assert = require('../../utils/assert.js'); -var server = require('../../utils/server.js'); +var Server = require('../../utils/server.js'); var URI = require('swagger-router').URI; var yaml = require('js-yaml'); var fs = require('fs'); + +const server = new Server('config.dev.yaml'); function staticSpecLoad() { @@ -281,5 +283,7 @@ }); + after(() => server.stop()); + }); diff --git a/test/features/v1/trending.js b/test/features/v1/trending.js index abadc86..96b48ea 100644 --- a/test/features/v1/trending.js +++ b/test/features/v1/trending.js @@ -3,20 +3,69 @@ const preq = require('preq'); const assert = require('../../utils/assert.js'); -const server = require('../../utils/server.js'); +const Server = require('../../utils/server.js'); const processor = require('../../../lib/processor.js'); const fixtures = require('../../utils/fixtures'); +const Producer = require('node-rdkafka').Producer; +const P = require('bluebird'); describe('trending', function() { this.timeout(20000); + const server = new Server('config.test.yaml'); + let producer; + before(function () { - processor.reset(); - return server.start(); + return server.start() + .then(() => { + return new P((resolve, reject) => { + const producer = new Producer( + { + 'metadata.broker.list': 'localhost:9092' + }, + {}, + (() => {}) + ); + producer.once('error', reject); + producer.connect(undefined, (err) => { + if (err) { + return reject(err); + } + return resolve(producer); + }); + }); + }) + .then((p) => { + producer = p; + }); + }); + + it('Should consume from kafka and create trendings', () => { + const msg = Buffer.from(JSON.stringify(fixtures.edit)); + for (let i = 0; i < 100; i++) { + producer.produce('test_dc.mediawiki.revision-create', 0, msg); + } + + return P.delay(500) + .then(() => preq.get({ + uri: server.config.uri + 'en.wikipedia.org/v1/feed/trending-edits/' + })) + .then((res) => { + assert.deepEqual(res.status, 200); + assert.deepEqual(res.body.pages.length, 1); + assert.deepEqual(res.body.pages[0], { + totalEdits: 100, + trendiness: -1, + isNew: false, + updated: '2016-12-08T18:48:56.000Z', + $merge: [ 'https://en.wikipedia.org/api/rest_v1/page/summary/Exploding_whale' ] + }) + }); }); it('check response format', function() { + processor.reset(); for (let i = 0; i < 1000; i++) { processor.process(fixtures.edit); } @@ -53,5 +102,11 @@ assert.ok(err.body.detail !== undefined, 'Error present in result'); }); }); + + + after(() => { + server.stop(); + producer.disconnect(); + }); }); diff --git a/test/utils/clean_kafka.sh b/test/utils/clean_kafka.sh new file mode 100755 index 0000000..1405d25 --- /dev/null +++ b/test/utils/clean_kafka.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +. $(cd $(dirname $0) && pwd)/../../node_modules/kafka-test-tools/clean_kafka.sh + +check 2181 "Zookeeper" +check 9092 "Kafka" + +# Don't need to clean anything in Jenkins or Travis +if [ "x$JENKINS_URL" = "x" ] || [ "$CI" = "true" ]; then + dropTopics "test_dc" + wait + sleep 5 +fi + +createTopic "test_dc.mediawiki.revision-create" +createTopic "test_dc.mediawiki.page-move" +createTopic "test_dc.mediawiki.page-delete" + +wait +sleep 5 diff --git a/test/utils/server.js b/test/utils/server.js index ba0d718..906d852 100644 --- a/test/utils/server.js +++ b/test/utils/server.js @@ -1,73 +1,64 @@ 'use strict'; +const ServiceRunner = require('service-runner'); +const fs = require('fs'); +const yaml = require('js-yaml'); +const P = require('bluebird'); +const preq = require('preq'); -// mocha defines to avoid JSHint breakage -/* global describe, it, before, beforeEach, after, afterEach */ +const SERVICE_STOP_DELAY = 500; +let startupRetryLimit = 3; -var BBPromise = require('bluebird'); -var ServiceRunner = require('service-runner'); -var logStream = require('./logStream'); -var fs = require('fs'); -var assert = require('./assert'); -var yaml = require('js-yaml'); -var extend = require('extend'); - - -// set up the configuration -var config = { - conf: yaml.safeLoad(fs.readFileSync(__dirname + '/../../config.yaml')) -}; -// build the API endpoint URI by supposing the actual service -// is the last one in the 'services' list in the config file -var myServiceIdx = config.conf.services.length - 1; -var myService = config.conf.services[myServiceIdx]; -config.uri = 'http://localhost:' + myService.conf.port + '/'; -config.service = myService; -// no forking, run just one process when testing -config.conf.num_workers = 0; -// have a separate, in-memory logger only -config.conf.logging = { - name: 'test-log', - level: 'trace', - stream: logStream() -}; -// make a deep copy of it for later reference -var origConfig = extend(true, {}, config); - -var stop = function() { return BBPromise.resolve(); }; -var options = null; -var runner = new ServiceRunner(); - - -function start(_options) { - - _options = _options || {}; - - if (!assert.isDeepEqual(options, _options)) { - console.log('server options changed; restarting'); - return stop().then(function() { - options = _options; - // set up the config - config = extend(true, {}, origConfig); - extend(true, config.conf.services[myServiceIdx].conf, options); - return runner.start(config.conf) - .then(function() { - stop = function () { - console.log('stopping test server'); - return runner.stop().then(function() { - stop = function() { return BBPromise.resolve(); }; - }); - }; - return true; - }); - }); - } else { - return BBPromise.resolve(); +class TrendingService { + constructor(configPath) { + this._configPath = configPath; + this.config = { + conf: this._loadConfig() + }; + this.config.conf.num_workers = 0; + this.config.conf.logging = { + name: 'trending-edits', + level: 'fatal', + streams: [{type: 'stdout'}] + }; + this.config.service = this.config.conf.services[0]; + this.config.uri = `http://localhost:${this.config.service.conf.port}/`; + this._runner = new ServiceRunner(); + this._running = false; } + _loadConfig() { + return yaml.safeLoad(fs.readFileSync(this._configPath).toString()); + } + + start() { + if (this._running) { + console.log('The test server is already running. Skipping start.') + return P.resolve(); + } + + return this._runner.start(this.config.conf) + .tap(() => this._running = true) + .delay(5000) + .catch((e) => { + if (startupRetryLimit > 0 && /EADDRINUSE/.test(e.message)) { + console.log('Execution of the previous test might have not finished yet. Retry startup'); + startupRetryLimit--; + return P.delay(1000).then(() => this.start()); + } + throw e; + }); + } + + stop() { + if (this._running) { + return this._runner.stop() + .tap(() => this._running = false) + .delay(SERVICE_STOP_DELAY); + } + return P.resolve(); + } } -module.exports.config = config; -module.exports.start = start; - +module.exports = TrendingService; -- To view, visit https://gerrit.wikimedia.org/r/326169 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I1d7f28415f56e81436f522084b82d3cec3be52e6 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Ppchelko <ppche...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits