Well, I think, in general, it will be a question of trade-off between latency and performance in case of EOS sink (since EOS can’t be "for free").
I can’t recommend specific numbers for Flink (maybe Maximilian Michels or others with more Flink knowledge can do), but I’d just try different numbers to see how it it will affect the results. > On 23 Jun 2020, at 23:27, Eleanore Jin <[email protected]> wrote: > > Hi Alexey, > > Thanks a lot for the information! I will give it a try. > > Regarding the checkpoint intervals, I think the Flink community suggested > something between 3-5 minutes, I am not sure yet if the checkpoint interval > can be in milliseconds? Currently, our beam pipeline is stateless, there is > no other operator state or user defined state. > > Thanks a lot! > Eleanore > > On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <[email protected] > <mailto:[email protected]>> wrote: > >> On 23 Jun 2020, at 07:49, Eleanore Jin <[email protected] >> <mailto:[email protected]>> wrote: >> >> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way I >> understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic, >> "mySlinkGroupId"), reading from your response, do I need additionally >> configure KafkaProducer property enable.idempotence=true, or I only need to >> configure this property? > > No, you don’t need to do that. New KafkaProducer will be created with this > option set in KafkaExactlyOnceSink [1]. > >> So can you please correct me if the above settings is not the optimal and if >> there is anyway to reduce the latency by introducing checkpointing for EOS? > > Your settings look fine for me. You probably could play with checkpoint > intervals (why it’s 10 secs?) to reduce a latency. > > > [1] > https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711 > > <https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711> > > >> >> >> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <[email protected] >> <mailto:[email protected]>> wrote: >> I think you don’t need to enable EOS in this case since KafkaIO has a >> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by >> all runners) and it relies on setting “enable.idempotence=true” for >> KafkaProducer. >> I’m not sure that you can achieve “at least once” semantics with current >> KafkaIO implementation. >> >>> On 16 Jun 2020, at 17:16, Eleanore Jin <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hi All, >>> >>> I previously asked a few questions regarding enable EOS (exactly once >>> semantics) please see below. >>> >>> Our Beam pipeline uses KafkaIO to read from source topic, and then use >>> KafkaIO to publish to sink topic. >>> >>> According to Max's answer to my previous questions, enable EOS with KafkaIO >>> will introduce latency, >>> as only after checkpoints of all message within the checkpoint interval, >>> then the KakfaIO.ExactlyOnceWriter >>> processElement method will be called. So the latency depends on the >>> checkpoint interval. >>> >>> I just wonder if I relax to At Least Once, do I still need to enable EOS on >>> KafkaIO? Or it is not required? >>> If not, can you please provide some instruction how should it be done? >>> >>> Thanks a lot! >>> Eleanore >>> >>> > Thanks for the response! the reason to setup the state backend is to >>> > experiment Kafka EOS with Beam running on Flink. Reading through the >>> > code and this PR <https://github.com/apache/beam/pull/7991/files >>> > <https://github.com/apache/beam/pull/7991/files>>, can >>> > you please help me clarify my understanding? >>> > >>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve >>> > EOS, ExactlyOnceWriter processElement method is annotated >>> > with @RequiresStableInput, so all the messages will be cached >>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those >>> > messages will be processed by ExactlyOnceWriter? >>> >>> That's correct. >>> >>> > >>> > 2. Upon checkpoint, will those messages cached by >>> > KeyedBufferingEleementsHandler also checkpointed? >>> >>> Yes, the buffered elements will be checkpointed. >>> >>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the >>> > stream processing, the delay is based on the checkpoint interval? How to >>> > reduce the latency while still have EOS guarantee? >>> >>> Indeed, the checkpoint interval and the checkpoint duration limits the >>> latency. Given the current design and the guarantees, there is no other >>> way to influence the latency. >>> >>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon >>> > checkpoint successfully, the checkpointed offset will be committed back >>> > to kafka, but if this operation does not finish successfully, and then >>> > the job gets cancelled/stopped, and re-submit the job again (with the >>> > same consumer group for source topics, but different jobID), then it is >>> > possible duplicated processing still exists? because the consumed offset >>> > is not committed back to kafka? >>> >>> This option is for the Kafka consumer. AFAIK this is just a convenience >>> method to commit the latest checkpointed offset to Kafka. This offset is >>> not used when restoring from a checkpoint. However, if you don't restore >>> from a checkpoint, you can resume from that offset which might be >>> convenient or not, depending on your use case. >>> >>> >> >
