Hi,
I'am just wondering about how RocketMQSource guarantees exactly-once between 
flink operators. 


According to 
https://github.com/apache/rocketmq-externals/blob/master/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java,
 there is a thread pulling messages in batch, and after all messages in the 
batch are sent to downstreams, RocketMQSource.offsetTable is updated with 
pullResult.getNextBeginOffset(). For checkpoint, it is the 
RocketMQSource.offsetTable that will be saved.
If the checkpoint is triggered during the batch processing, is there some 
possibility that some messages will be sent to downstreams twice? For example, 
let's assume current offset is 10, and the pulling thread consumes 32 messages 
in one batch. After 5 messages were sent to downstreams, the checkpoint was 
triggered. At this time, RocketMQSource.offsetTable has a offset of 10, instead 
of 15. When the program recovers with this checkpoint, it will consume from 
offset 10 and the messages with offset 10-15 will be sent to downstreams again.


Looking forward for your comments! Thank you very much.


Dan

Reply via email to