Does Spark cache which kafka topics exist? A service incorrectly assumes
all the relevant topics exist, even if they are empty, causing it to fail.
Fortunately the service is automatically restarted and by default, kafka
creates the topic after it is requested.

I'm trying to create the topic if it doesn't exist using
AdminUtils.createTopic:

      val zkClient = new ZkClient("localhost:2181", 10000, 10000,
ZKStringSerializer)
      while (!AdminUtils.topicExists(zkClient, topic)) {
        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
      }

But I still get an Error getting partition metadata for 'topic-name'. Does
the topic exist? when I execute KafkaUtils.createDirectStream

I've also tried to implement a retry with a wait such that the retry should
occur after Kafka has created the requested topic with
auto.create.topics.enable
= true, but this still doesn't work consistently.

This is a bit frustrating to debug as well since the topic is successfully
created about 50% of the time, other times I get message "Does the topic
exist?". My thinking is that Spark may be caching the list of extant kafka
topics, ignoring that I've added a new one. Is this the case? Am I missing
something?


Ben

Reply via email to