I would like to stop FlinkKafkaConsumer consuming data from kafka manually. But I find it won't be close when I invoke "cancel()" method. What I am trying to do is add an EOF symbol meaning the end of my kafka data, and when the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink streaming job won't finish unless it get canceled or failed, when I use kafka as source.
Somebody knowing gives me some help, thx~~