hi??I have a source Kafka and a sink Kafka??when the amount of data processing
grows??I need to expand Kafka topic's partition number ,but I don't want
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could
consume the new parititon after I expand the Kafka topic's partition
number.
but sink kafka don't work like this??
The flink kafka producer get topic's paritition list and cache in
topicPartitionsMap as showed In Class FlinkKafkaProducer<IN> :
int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}When kafka topic needs to expand??the new parition can not be discovered??For
example, when expanding from 1 partition to 2 partitions??partitions we got is
never update until job restartAre there plans to improve this feature or Is
there any other way to achieve the function?