[MediaWiki-commits] [Gerrit] mediawiki...trending-edits[master]: Kafka consumer: Switch back to flowing mode.

2017-01-17 Thread Mobrovac (Code Review)
Mobrovac has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/327859 )

Change subject: Kafka consumer: Switch back to flowing mode.
..


Kafka consumer: Switch back to flowing mode.

The driver bug has been fied and the new version
has been published, so switch back to the flowing
mode since it's more efficient.

Bug: T153122
Bug: T145571
Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60
---
M lib/edit-stream.js
M package.json
2 files changed, 18 insertions(+), 28 deletions(-)

Approvals:
  Mobrovac: Verified; Looks good to me, approved
  jenkins-bot: Verified



diff --git a/lib/edit-stream.js b/lib/edit-stream.js
index 0cc1603..ea5d0a5 100644
--- a/lib/edit-stream.js
+++ b/lib/edit-stream.js
@@ -90,35 +90,25 @@
 this._consumer.connect();
 this._consumer
 .on('ready', () => {
-this._consumer.subscribe(this._consumeTopics);
-const consume = () => {
-this._consumer.consume((e, kafkaMessage) => {
-if (e) {
-if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF
-&& e.code !== 
kafka.CODES.ERRORS.ERR__TIMED_OUT) {
-this.emit('error', e);
-}
-return consume();
-}
+this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => {
+if (e) {
+return this.emit('error', e);
+}
 
-try {
-let msg = JSON.parse(kafkaMessage.value.toString());
-const skip = COMMIT_PUSH_SKIP[msg.meta.topic];
-if (!skip || kafkaMessage.offset % skip === 0) {
-this._commitQueue.push({
-message: kafkaMessage,
-timestamp: msg.meta.dt
-});
-}
-this.emit('edit', msg);
-} catch (e) {
-this.emit('error', e);
-} finally {
-consume();
+try {
+const msg = JSON.parse(kafkaMessage.value.toString());
+const skip = COMMIT_PUSH_SKIP[msg.meta.topic];
+if (!skip || kafkaMessage.offset % skip === 0) {
+this._commitQueue.push({
+message: kafkaMessage,
+timestamp: Date.parse(msg.meta.dt)
+});
 }
-});
-};
-consume();
+this.emit('edit', msg);
+} catch (e) {
+this.emit('error', e);
+}
+});
 });
 this._commitQueue = [];
 this._commitInterval = setInterval(() => {
diff --git a/package.json b/package.json
index e12c7e9..cf25766 100644
--- a/package.json
+++ b/package.json
@@ -33,7 +33,7 @@
 "preq": "^0.4.10",
 "service-runner": "^2.1.10",
 "swagger-router": "^0.4.6",
-"node-rdkafka": "^0.6.0",
+"node-rdkafka": "^0.6.2",
 "wikipedia-edits-scorer": "^1.4.0"
   },
   "devDependencies": {

-- 
To view, visit https://gerrit.wikimedia.org/r/327859
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60
Gerrit-PatchSet: 3
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Ppchelko 
Gerrit-Reviewer: Jdlrobson 
Gerrit-Reviewer: Mobrovac 
Gerrit-Reviewer: Ppchelko 
Gerrit-Reviewer: jenkins-bot <>

___
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits


[MediaWiki-commits] [Gerrit] mediawiki...trending-edits[master]: Kafka consumer: Switch back to flowing mode.

2016-12-16 Thread Ppchelko (Code Review)
Ppchelko has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/327859 )

Change subject: Kafka consumer: Switch back to flowing mode.
..

Kafka consumer: Switch back to flowing mode.

The driver bug has been fied and the new version
has been published, so switch back to the flowing
mode since it's more efficient.

Bug: T153122
Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60
---
M lib/edit-stream.js
M package.json
2 files changed, 19 insertions(+), 28 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits 
refs/changes/59/327859/1

diff --git a/lib/edit-stream.js b/lib/edit-stream.js
index 0cc1603..318e7a4 100644
--- a/lib/edit-stream.js
+++ b/lib/edit-stream.js
@@ -90,35 +90,26 @@
 this._consumer.connect();
 this._consumer
 .on('ready', () => {
-this._consumer.subscribe(this._consumeTopics);
-const consume = () => {
-this._consumer.consume((e, kafkaMessage) => {
-if (e) {
-if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF
-&& e.code !== 
kafka.CODES.ERRORS.ERR__TIMED_OUT) {
-this.emit('error', e);
-}
-return consume();
-}
+this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => {
+if (e) {
+return this.emit('error', e);
+}
 
-try {
-let msg = JSON.parse(kafkaMessage.value.toString());
-const skip = COMMIT_PUSH_SKIP[msg.meta.topic];
-if (!skip || kafkaMessage.offset % skip === 0) {
-this._commitQueue.push({
-message: kafkaMessage,
-timestamp: msg.meta.dt
-});
-}
-this.emit('edit', msg);
-} catch (e) {
-this.emit('error', e);
-} finally {
-consume();
+try {
+const msg = JSON.parse(kafkaMessage.value.toString());
+const pureTopic = 
kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1);
+const skip = COMMIT_PUSH_SKIP[msg.meta.topic];
+if (!skip || kafkaMessage.offset % skip === 0) {
+this._commitQueue.push({
+message: kafkaMessage,
+timestamp: msg.meta.dt
+});
 }
-});
-};
-consume();
+this.emit('edit', msg);
+} catch (e) {
+this.emit('error', e);
+}
+});
 });
 this._commitQueue = [];
 this._commitInterval = setInterval(() => {
diff --git a/package.json b/package.json
index e12c7e9..cf25766 100644
--- a/package.json
+++ b/package.json
@@ -33,7 +33,7 @@
 "preq": "^0.4.10",
 "service-runner": "^2.1.10",
 "swagger-router": "^0.4.6",
-"node-rdkafka": "^0.6.0",
+"node-rdkafka": "^0.6.2",
 "wikipedia-edits-scorer": "^1.4.0"
   },
   "devDependencies": {

-- 
To view, visit https://gerrit.wikimedia.org/r/327859
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/trending-edits
Gerrit-Branch: master
Gerrit-Owner: Ppchelko 

___
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits