Hi,

We are currently using a single FlinkKafkaConsumer to consume multiple Kafka 
topics, however, we find that if one of the Kafka topics goes down at run 
time(like rebooting one of the topics), the FlinkKafkaConsumer will keep 
throwing warning message of the dead Kafka topic, and will also continue 
consume other live Kafka topics.
However, what we want is that, if one of the topics goes down, the 
FlinkKafkaConsumer will wait and stop consuming other live topics until the 
dead topic goes live.

Code example:

List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList( “KafkaTopic1”,  
“KafkaTopic2” ) );
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( 
kafkaTopicsList, new SimpleStringSchema(), properties);

As shown in the code example, kafkaTopicsList contains two Kafka topics, and 
flinkKafkaConsumer consumes both two topics. We hope that if KafkaTopic1 goes 
down at run-time(we may reboot KafkaTopic1 at run time), the flinkKafkaConsumer 
will wait and stop consuming KafkaTopic2, until KafkaTopic1 goes live again.

May I ask is it possible to achieve this purpose using current Flink API? Do we 
need to edit configuration somewhere? Or we have to overwrite 
FlinkKafkaConsumer Class to achieve this? Thank you very much!

Thanks, Yan

Reply via email to