Ori.livneh has uploaded a new change for review. https://gerrit.wikimedia.org/r/130292
Change subject: Fixes for Kafka consumer ...................................................................... Fixes for Kafka consumer * Don't pass driver's kwargs (which includes path) to KeyedProducer constructor. * Encode topic & key to bytes. Change-Id: Idec52a421e82660c9f62f97dec29ace5e6dcb09c --- M server/eventlogging/handlers.py 1 file changed, 5 insertions(+), 3 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging refs/changes/92/130292/1 diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py index 9806d46..c8b4459 100644 --- a/server/eventlogging/handlers.py +++ b/server/eventlogging/handlers.py @@ -65,18 +65,20 @@ @writes('kafka') -def kafka_writer(brokers, topic='eventlogging', **kwargs): +def kafka_writer(brokers, topic='eventlogging'): """Write events to Kafka, keyed by SCID.""" from kafka.client import KafkaClient from kafka.producer import KeyedProducer kafka = KafkaClient(brokers) - producer = KeyedProducer(kafka, topic, **kwargs) + producer = KeyedProducer(kafka) + topic = topic.encode('utf-8') while 1: event = (yield) key = '%(schema)s_%(revision)s' % event # e.g. 'EchoMail_5467650' - producer.send(key, json.dumps(event, sort_keys=True)) + key = key.encode('utf-8') + producer.send(topic, key, json.dumps(event, sort_keys=True)) @writes('mysql', 'sqlite') -- To view, visit https://gerrit.wikimedia.org/r/130292 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Idec52a421e82660c9f62f97dec29ace5e6dcb09c Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/EventLogging Gerrit-Branch: master Gerrit-Owner: Ori.livneh <o...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits