Hi, I'am just wondering about how RocketMQSource guarantees exactly-once between flink operators.
According to https://github.com/apache/rocketmq-externals/blob/master/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java, there is a thread pulling messages in batch, and after all messages in the batch are sent to downstreams, RocketMQSource.offsetTable is updated with pullResult.getNextBeginOffset(). For checkpoint, it is the RocketMQSource.offsetTable that will be saved. If the checkpoint is triggered during the batch processing, is there some possibility that some messages will be sent to downstreams twice? For example, let's assume current offset is 10, and the pulling thread consumes 32 messages in one batch. After 5 messages were sent to downstreams, the checkpoint was triggered. At this time, RocketMQSource.offsetTable has a offset of 10, instead of 15. When the program recovers with this checkpoint, it will consume from offset 10 and the messages with offset 10-15 will be sent to downstreams again. Looking forward for your comments! Thank you very much. Dan
