Hi Alexey, Thanks a lot for the information! I will give it a try.
Regarding the checkpoint intervals, I think the Flink community suggested something between 3-5 minutes, I am not sure yet if the checkpoint interval can be in milliseconds? Currently, our beam pipeline is stateless, there is no other operator state or user defined state. Thanks a lot! Eleanore On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <[email protected]> wrote: > > On 23 Jun 2020, at 07:49, Eleanore Jin <[email protected]> wrote: > > the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the > way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic, > "mySlinkGroupId"), reading from your response, do I need additionally > configure KafkaProducer property enable.idempotence=true, or I only need to > configure this property? > > > No, you don’t need to do that. New KafkaProducer will be created with this > option set in KafkaExactlyOnceSink [1]. > > So can you please correct me if the above settings is not the optimal and > if there is anyway to reduce the latency by introducing checkpointing for > EOS? > > > Your settings look fine for me. You probably could play with checkpoint > intervals (why it’s 10 secs?) to reduce a latency. > > > [1] > https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711 > > > > > On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko < > [email protected]> wrote: > >> I think you don’t need to enable EOS in this case since KafkaIO has a >> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by >> all runners) and it relies on setting “enable.idempotence=true” for >> KafkaProducer. >> I’m not sure that you can achieve “at least once” semantics with current >> KafkaIO implementation. >> >> On 16 Jun 2020, at 17:16, Eleanore Jin <[email protected]> wrote: >> >> Hi All, >> >> I previously asked a few questions regarding enable EOS (exactly once >> semantics) please see below. >> >> Our Beam pipeline uses KafkaIO to read from source topic, and then use >> KafkaIO to publish to sink topic. >> >> According to Max's answer to my previous questions, enable EOS with >> KafkaIO will introduce latency, >> as only after checkpoints of all message within the checkpoint interval, >> then the KakfaIO.ExactlyOnceWriter >> processElement method will be called. So the latency depends on the >> checkpoint interval. >> >> I just wonder if I relax to At Least Once, do I still need to enable EOS >> on KafkaIO? Or it is not required? >> If not, can you please provide some instruction how should it be done? >> >> Thanks a lot! >> Eleanore >> >> > Thanks for the response! the reason to setup the state backend is to >> > experiment Kafka EOS with Beam running on Flink. Reading through the >> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can >> > you please help me clarify my understanding? >> > >> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve >> > EOS, ExactlyOnceWriter processElement method is annotated >> > with @RequiresStableInput, so all the messages will be cached >> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those >> > messages will be processed by ExactlyOnceWriter? >> >> That's correct. >> >> > >> > 2. Upon checkpoint, will those messages cached by >> > KeyedBufferingEleementsHandler also checkpointed? >> >> Yes, the buffered elements will be checkpointed. >> >> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the >> > stream processing, the delay is based on the checkpoint interval? How to >> > reduce the latency while still have EOS guarantee? >> >> Indeed, the checkpoint interval and the checkpoint duration limits the >> latency. Given the current design and the guarantees, there is no other >> way to influence the latency. >> >> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon >> > checkpoint successfully, the checkpointed offset will be committed back >> > to kafka, but if this operation does not finish successfully, and then >> > the job gets cancelled/stopped, and re-submit the job again (with the >> > same consumer group for source topics, but different jobID), then it is >> > possible duplicated processing still exists? because the consumed offset >> > is not committed back to kafka? >> >> This option is for the Kafka consumer. AFAIK this is just a convenience >> method to commit the latest checkpointed offset to Kafka. This offset is >> not used when restoring from a checkpoint. However, if you don't restore >> from a checkpoint, you can resume from that offset which might be >> convenient or not, depending on your use case. >> >> >> >
