Hi Yan, Afaik this is not directly supported and would be surprising to other users since it's a rather specific requirement. In fact, Flink delegates reading the topics to Kafka consumer API and I suspect that the warning you received is also coming from Kafka consumer (I have not found a respective warning in Flink's code base but you could also show the exact log statement so I can recheck).
What you could do is try to config Kafka consumer to fail hard when topic metadata cannot be retrieved with a small timeout. Note that I'm a bit confused by the terms "dead" topic and "rebooted" topic. Afaik you can only have dead brokers and rebooted brokers and maybe deleted topics. But I have yet to understand a use case where you would delete a topic while the consumer is running. On Thu, Sep 2, 2021 at 4:58 AM Yan Wang <y.yan.w.w...@oracle.com> wrote: > Hi, > > > > We are currently using a single FlinkKafkaConsumer to consume multiple > Kafka topics, however, we find that if one of the Kafka topics goes down at > run time(like rebooting one of the topics), the FlinkKafkaConsumer will > keep throwing warning message of the dead Kafka topic, and will also > continue consume other live Kafka topics. > > However, what we want is that, if one of the topics goes down, the > FlinkKafkaConsumer will wait and stop consuming other live topics until the > dead topic goes live. > > > Code example: > > *List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList( > “KafkaTopic1”, “KafkaTopic2” ) );* > > *FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( > kafkaTopicsList, new SimpleStringSchema(), properties);* > > > > As shown in the code example, *kafkaTopicsList* contains two Kafka > topics, and flinkKafkaConsumer consumes both two topics. We hope that if > *KafkaTopic1 > *goes down at run-time(we may reboot *KafkaTopic1 *at run time), the > flinkKafkaConsumer will wait and stop consuming *KafkaTopic2, *until* > KafkaTopic1 *goes live again. > > > > May I ask is it possible to achieve this purpose using current Flink API? > Do we need to edit configuration somewhere? Or we have to overwrite > *FlinkKafkaConsumer > *Class to achieve this? Thank you very much! > > > > Thanks, Yan >