Ottomata has submitted this change and it was merged.

Change subject: Add script to ensure configured Kafka topics exist
......................................................................


Add script to ensure configured Kafka topics exist

Change-Id: I1e3a4e20ab90e7d946abe6317c68de3d9c62d305
TODO: This script has no EventLogging dependencies.  Should it live in puppet?  
Elsewhere?
---
A bin/ensure-kafka-topics-exist
1 file changed, 64 insertions(+), 0 deletions(-)

Approvals:
  Ottomata: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/bin/ensure-kafka-topics-exist b/bin/ensure-kafka-topics-exist
new file mode 100755
index 0000000..942468f
--- /dev/null
+++ b/bin/ensure-kafka-topics-exist
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import argparse
+import logging
+import sys
+import yaml
+
+from pykafka import KafkaClient
+
+ap = argparse.ArgumentParser(
+    description='Ensures all topics in a topic config file exist in Kafka.',
+    fromfile_prefix_chars='@'
+)
+
+ap.add_argument(
+    '--topic-config',
+    default='./config/topics.yaml',
+    help='Topic -> schema config file',
+)
+
+ap.add_argument(
+    '--brokers',
+    default='localhost:9092',
+    help='Comma-separated list of kafka broker hosts',
+)
+
+
+def ensure_topics_exist(kafka_client, topics):
+    for topic in topics:
+        # Accessing the topic here will create it if
+        # it doesn't already exist.
+        t = kafka_client.topics[topic]
+        logging.info(
+            'Kafka topic %s exists with %d partition(s).',
+            topic,
+            len(t.partitions.keys())
+        )
+
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        stream=sys.stderr,
+        level=logging.INFO,
+        format='%(asctime)s %(message)s'
+    )
+    # PyKafka can be a little noisy.
+    logging.getLogger("pykafka").setLevel(logging.WARN)
+
+    args = ap.parse_args()
+
+    logging.info(
+        'Ensuring all topics configured in %s exist in Kafka'
+        'cluster including brokers %s',
+        args.topic_config, args.brokers
+    )
+
+    topic_config = None
+    # Load the topic_config from the config file.
+    with open(args.topic_config) as f:
+        topic_config = yaml.load(f)
+
+    kafka_client = KafkaClient(hosts=args.brokers)
+    ensure_topics_exist(kafka_client, topic_config.keys())

-- 
To view, visit https://gerrit.wikimedia.org/r/257626
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I1e3a4e20ab90e7d946abe6317c68de3d9c62d305
Gerrit-PatchSet: 4
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Mobrovac <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to