Hi All, I previously asked a few questions regarding enable EOS (exactly once semantics) please see below.
Our Beam pipeline uses KafkaIO to read from source topic, and then use KafkaIO to publish to sink topic. According to Max's answer to my previous questions, enable EOS with KafkaIO will introduce latency, as only after checkpoints of all message within the checkpoint interval, then the KakfaIO.ExactlyOnceWriter processElement method will be called. So the latency depends on the checkpoint interval. I just wonder if I relax to At Least Once, do I still need to enable EOS on KafkaIO? Or it is not required? If not, can you please provide some instruction how should it be done? Thanks a lot! Eleanore > Thanks for the response! the reason to setup the state backend is to > experiment Kafka EOS with Beam running on Flink. Reading through the > code and this PR <https://github.com/apache/beam/pull/7991/files>, can > you please help me clarify my understanding? > > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve > EOS, ExactlyOnceWriter processElement method is annotated > with @RequiresStableInput, so all the messages will be cached > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those > messages will be processed by ExactlyOnceWriter? That's correct. > > 2. Upon checkpoint, will those messages cached by > KeyedBufferingEleementsHandler also checkpointed? Yes, the buffered elements will be checkpointed. > 3. It seems the way Beam provides Kafka EOS will introduce delays in the > stream processing, the delay is based on the checkpoint interval? How to > reduce the latency while still have EOS guarantee? Indeed, the checkpoint interval and the checkpoint duration limits the latency. Given the current design and the guarantees, there is no other way to influence the latency. > 4. commitOffsetsInFinalize is also enabled, does this mean, upon > checkpoint successfully, the checkpointed offset will be committed back > to kafka, but if this operation does not finish successfully, and then > the job gets cancelled/stopped, and re-submit the job again (with the > same consumer group for source topics, but different jobID), then it is > possible duplicated processing still exists? because the consumed offset > is not committed back to kafka? This option is for the Kafka consumer. AFAIK this is just a convenience method to commit the latest checkpointed offset to Kafka. This offset is not used when restoring from a checkpoint. However, if you don't restore from a checkpoint, you can resume from that offset which might be convenient or not, depending on your use case.
