kafka source ????????flink.partition-discovery.interval-millis ??????????????????????????????????kafka sink???????????????????????????????? partition????????????????????map????????????????????????????
int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}????????????????????????????????????????????????????????????????????
