[MediaWiki-commits] [Gerrit] mediawiki...trending-edits[master]: Kafka consumer: Switch back to flowing mode.
Mobrovac has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/327859 ) Change subject: Kafka consumer: Switch back to flowing mode. .. Kafka consumer: Switch back to flowing mode. The driver bug has been fied and the new version has been published, so switch back to the flowing mode since it's more efficient. Bug: T153122 Bug: T145571 Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60 --- M lib/edit-stream.js M package.json 2 files changed, 18 insertions(+), 28 deletions(-) Approvals: Mobrovac: Verified; Looks good to me, approved jenkins-bot: Verified diff --git a/lib/edit-stream.js b/lib/edit-stream.js index 0cc1603..ea5d0a5 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -90,35 +90,25 @@ this._consumer.connect(); this._consumer .on('ready', () => { -this._consumer.subscribe(this._consumeTopics); -const consume = () => { -this._consumer.consume((e, kafkaMessage) => { -if (e) { -if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF -&& e.code !== kafka.CODES.ERRORS.ERR__TIMED_OUT) { -this.emit('error', e); -} -return consume(); -} +this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => { +if (e) { +return this.emit('error', e); +} -try { -let msg = JSON.parse(kafkaMessage.value.toString()); -const skip = COMMIT_PUSH_SKIP[msg.meta.topic]; -if (!skip || kafkaMessage.offset % skip === 0) { -this._commitQueue.push({ -message: kafkaMessage, -timestamp: msg.meta.dt -}); -} -this.emit('edit', msg); -} catch (e) { -this.emit('error', e); -} finally { -consume(); +try { +const msg = JSON.parse(kafkaMessage.value.toString()); +const skip = COMMIT_PUSH_SKIP[msg.meta.topic]; +if (!skip || kafkaMessage.offset % skip === 0) { +this._commitQueue.push({ +message: kafkaMessage, +timestamp: Date.parse(msg.meta.dt) +}); } -}); -}; -consume(); +this.emit('edit', msg); +} catch (e) { +this.emit('error', e); +} +}); }); this._commitQueue = []; this._commitInterval = setInterval(() => { diff --git a/package.json b/package.json index e12c7e9..cf25766 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "preq": "^0.4.10", "service-runner": "^2.1.10", "swagger-router": "^0.4.6", -"node-rdkafka": "^0.6.0", +"node-rdkafka": "^0.6.2", "wikipedia-edits-scorer": "^1.4.0" }, "devDependencies": { -- To view, visit https://gerrit.wikimedia.org/r/327859 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60 Gerrit-PatchSet: 3 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Ppchelko Gerrit-Reviewer: Jdlrobson Gerrit-Reviewer: Mobrovac Gerrit-Reviewer: Ppchelko Gerrit-Reviewer: jenkins-bot <> ___ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
[MediaWiki-commits] [Gerrit] mediawiki...trending-edits[master]: Kafka consumer: Switch back to flowing mode.
Ppchelko has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/327859 ) Change subject: Kafka consumer: Switch back to flowing mode. .. Kafka consumer: Switch back to flowing mode. The driver bug has been fied and the new version has been published, so switch back to the flowing mode since it's more efficient. Bug: T153122 Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60 --- M lib/edit-stream.js M package.json 2 files changed, 19 insertions(+), 28 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/trending-edits refs/changes/59/327859/1 diff --git a/lib/edit-stream.js b/lib/edit-stream.js index 0cc1603..318e7a4 100644 --- a/lib/edit-stream.js +++ b/lib/edit-stream.js @@ -90,35 +90,26 @@ this._consumer.connect(); this._consumer .on('ready', () => { -this._consumer.subscribe(this._consumeTopics); -const consume = () => { -this._consumer.consume((e, kafkaMessage) => { -if (e) { -if (e.code !== kafka.CODES.ERRORS.ERR__PARTITION_EOF -&& e.code !== kafka.CODES.ERRORS.ERR__TIMED_OUT) { -this.emit('error', e); -} -return consume(); -} +this._consumer.consume(this._consumeTopics, (e, kafkaMessage) => { +if (e) { +return this.emit('error', e); +} -try { -let msg = JSON.parse(kafkaMessage.value.toString()); -const skip = COMMIT_PUSH_SKIP[msg.meta.topic]; -if (!skip || kafkaMessage.offset % skip === 0) { -this._commitQueue.push({ -message: kafkaMessage, -timestamp: msg.meta.dt -}); -} -this.emit('edit', msg); -} catch (e) { -this.emit('error', e); -} finally { -consume(); +try { +const msg = JSON.parse(kafkaMessage.value.toString()); +const pureTopic = kafkaMessage.topic.substr(kafkaMessage.topic.indexOf('.') + 1); +const skip = COMMIT_PUSH_SKIP[msg.meta.topic]; +if (!skip || kafkaMessage.offset % skip === 0) { +this._commitQueue.push({ +message: kafkaMessage, +timestamp: msg.meta.dt +}); } -}); -}; -consume(); +this.emit('edit', msg); +} catch (e) { +this.emit('error', e); +} +}); }); this._commitQueue = []; this._commitInterval = setInterval(() => { diff --git a/package.json b/package.json index e12c7e9..cf25766 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "preq": "^0.4.10", "service-runner": "^2.1.10", "swagger-router": "^0.4.6", -"node-rdkafka": "^0.6.0", +"node-rdkafka": "^0.6.2", "wikipedia-edits-scorer": "^1.4.0" }, "devDependencies": { -- To view, visit https://gerrit.wikimedia.org/r/327859 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic7ae50d31557afb2dbe002fa0f78b5155ee19d60 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/trending-edits Gerrit-Branch: master Gerrit-Owner: Ppchelko ___ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits