Ottomata has submitted this change and it was merged.
Change subject: Add script to ensure configured Kafka topics exist
......................................................................
Add script to ensure configured Kafka topics exist
Change-Id: I1e3a4e20ab90e7d946abe6317c68de3d9c62d305
TODO: This script has no EventLogging dependencies. Should it live in puppet?
Elsewhere?
---
A bin/ensure-kafka-topics-exist
1 file changed, 64 insertions(+), 0 deletions(-)
Approvals:
Ottomata: Looks good to me, approved
jenkins-bot: Verified
diff --git a/bin/ensure-kafka-topics-exist b/bin/ensure-kafka-topics-exist
new file mode 100755
index 0000000..942468f
--- /dev/null
+++ b/bin/ensure-kafka-topics-exist
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import argparse
+import logging
+import sys
+import yaml
+
+from pykafka import KafkaClient
+
+ap = argparse.ArgumentParser(
+ description='Ensures all topics in a topic config file exist in Kafka.',
+ fromfile_prefix_chars='@'
+)
+
+ap.add_argument(
+ '--topic-config',
+ default='./config/topics.yaml',
+ help='Topic -> schema config file',
+)
+
+ap.add_argument(
+ '--brokers',
+ default='localhost:9092',
+ help='Comma-separated list of kafka broker hosts',
+)
+
+
+def ensure_topics_exist(kafka_client, topics):
+ for topic in topics:
+ # Accessing the topic here will create it if
+ # it doesn't already exist.
+ t = kafka_client.topics[topic]
+ logging.info(
+ 'Kafka topic %s exists with %d partition(s).',
+ topic,
+ len(t.partitions.keys())
+ )
+
+
+if __name__ == "__main__":
+ logging.basicConfig(
+ stream=sys.stderr,
+ level=logging.INFO,
+ format='%(asctime)s %(message)s'
+ )
+ # PyKafka can be a little noisy.
+ logging.getLogger("pykafka").setLevel(logging.WARN)
+
+ args = ap.parse_args()
+
+ logging.info(
+ 'Ensuring all topics configured in %s exist in Kafka'
+ 'cluster including brokers %s',
+ args.topic_config, args.brokers
+ )
+
+ topic_config = None
+ # Load the topic_config from the config file.
+ with open(args.topic_config) as f:
+ topic_config = yaml.load(f)
+
+ kafka_client = KafkaClient(hosts=args.brokers)
+ ensure_topics_exist(kafka_client, topic_config.keys())
--
To view, visit https://gerrit.wikimedia.org/r/257626
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I1e3a4e20ab90e7d946abe6317c68de3d9c62d305
Gerrit-PatchSet: 4
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Mobrovac <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits