Hi Yan,

Afaik this is not directly supported and would be surprising to other users
since it's a rather specific requirement.
In fact, Flink delegates reading the topics to Kafka consumer API and I
suspect that the warning you received is also coming from Kafka consumer (I
have not found a respective warning in Flink's code base but you could also
show the exact log statement so I can recheck).

What you could do is try to config Kafka consumer to fail hard when topic
metadata cannot be retrieved with a small timeout.

Note that I'm a bit confused by the terms "dead" topic and "rebooted"
topic. Afaik you can only have dead brokers and rebooted brokers and maybe
deleted topics. But I have yet to understand a use case where you would
delete a topic while the consumer is running.

On Thu, Sep 2, 2021 at 4:58 AM Yan Wang <y.yan.w.w...@oracle.com> wrote:

> Hi,
>
>
>
> We are currently using a single FlinkKafkaConsumer to consume multiple
> Kafka topics, however, we find that if one of the Kafka topics goes down at
> run time(like rebooting one of the topics), the FlinkKafkaConsumer will
> keep throwing warning message of the dead Kafka topic, and will also
> continue consume other live Kafka topics.
>
> However, what we want is that, if one of the topics goes down, the
> FlinkKafkaConsumer will wait and stop consuming other live topics until the
> dead topic goes live.
>
>
> Code example:
>
> *List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList(
> “KafkaTopic1”,  “KafkaTopic2” ) );*
>
> *FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(
> kafkaTopicsList, new SimpleStringSchema(), properties);*
>
>
>
> As shown in the code example, *kafkaTopicsList* contains two Kafka
> topics, and flinkKafkaConsumer consumes both two topics. We hope that if 
> *KafkaTopic1
> *goes down at run-time(we may reboot *KafkaTopic1 *at run time), the
> flinkKafkaConsumer will wait and stop consuming *KafkaTopic2, *until*
> KafkaTopic1 *goes live again.
>
>
>
> May I ask is it possible to achieve this purpose using current Flink API?
> Do we need to edit configuration somewhere? Or we have to overwrite 
> *FlinkKafkaConsumer
> *Class to achieve this? Thank you very much!
>
>
>
> Thanks, Yan
>

Reply via email to