Hi All,

I previously asked a few questions regarding enable EOS (exactly once
semantics) please see below.

Our Beam pipeline uses KafkaIO to read from source topic, and then use
KafkaIO to publish to sink topic.

According to Max's answer to my previous questions, enable EOS with KafkaIO
will introduce latency,
as only after checkpoints of all message within the checkpoint interval,
then the KakfaIO.ExactlyOnceWriter
processElement method will be called. So the latency depends on the
checkpoint interval.

I just wonder if I relax to At Least Once, do I still need to enable EOS on
KafkaIO? Or it is not required?
If not, can you please provide some instruction how should it be done?

Thanks a lot!
Eleanore

> Thanks for the response! the reason to setup the state backend is to
> experiment Kafka EOS with Beam running on Flink.  Reading through the
> code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> you please help me clarify my understanding?
>
> 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> EOS, ExactlyOnceWriter processElement method is annotated
> with @RequiresStableInput, so all the messages will be cached
> by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> messages will be processed by ExactlyOnceWriter?

That's correct.

>
> 2. Upon checkpoint, will those messages cached by
> KeyedBufferingEleementsHandler also checkpointed?

Yes, the buffered elements will be checkpointed.

> 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> stream processing, the delay is based on the checkpoint interval? How to
> reduce the latency while still have EOS guarantee?

Indeed, the checkpoint interval and the checkpoint duration limits the
latency. Given the current design and the guarantees, there is no other
way to influence the latency.

> 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> checkpoint successfully, the checkpointed offset will be committed back
> to kafka, but if this operation does not finish successfully, and then
> the job gets cancelled/stopped, and re-submit the job again (with the
> same consumer group for source topics, but different jobID), then it is
> possible duplicated processing still exists? because the consumed offset
> is not committed back to kafka?

This option is for the Kafka consumer. AFAIK this is just a convenience
method to commit the latest checkpointed offset to Kafka. This offset is
not used when restoring from a checkpoint. However, if you don't restore
from a checkpoint, you can resume from that offset which might be
convenient or not, depending on your use case.

Reply via email to