Hi Alex, You previously suggested for KafkaIO exactly once, I do not need to configure KafkaIO.write().withEOS(), only need to add producer property enable.idempotence=true, however from the Code: https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1311, only withEOS(), then it will use KafkaExactlyOnceSink, otherwise it will use KafkaWriter.
[image: image.png] Did I misunderstand something? Or does it require EOS() for end-to-end exactly once with KafkaIO as source and sink? Thanks a lot! Eleanore On Wed, Jun 24, 2020 at 12:48 PM Eleanore Jin <[email protected]> wrote: > > Hi Alex, > > Thanks a lot for the info. > > Eleanore > > On Wed, Jun 24, 2020 at 9:26 AM Alexey Romanenko <[email protected]> > wrote: > >> Well, I think, in general, it will be a question of trade-off between >> latency and performance in case of EOS sink (since EOS can’t be "for >> free"). >> >> I can’t recommend specific numbers for Flink (maybe Maximilian Michels or >> others with more Flink knowledge can do), but I’d just try different >> numbers to see how it it will affect the results. >> >> On 23 Jun 2020, at 23:27, Eleanore Jin <[email protected]> wrote: >> >> Hi Alexey, >> >> Thanks a lot for the information! I will give it a try. >> >> Regarding the checkpoint intervals, I think the Flink community suggested >> something between 3-5 minutes, I am not sure yet if the checkpoint interval >> can be in milliseconds? Currently, our beam pipeline is stateless, there is >> no other operator state or user defined state. >> >> Thanks a lot! >> Eleanore >> >> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko < >> [email protected]> wrote: >> >>> >>> On 23 Jun 2020, at 07:49, Eleanore Jin <[email protected]> wrote: >>> >>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the >>> way I understand to use is >>> KafkaIO.write().withEOS(numPartitionsForSinkTopic, >>> "mySlinkGroupId"), reading from your response, do I need additionally >>> configure KafkaProducer property enable.idempotence=true, or I only need to >>> configure this property? >>> >>> >>> No, you don’t need to do that. New KafkaProducer will be created with >>> this option set in KafkaExactlyOnceSink [1]. >>> >>> So can you please correct me if the above settings is not the optimal >>> and if there is anyway to reduce the latency by introducing checkpointing >>> for EOS? >>> >>> >>> Your settings look fine for me. You probably could play with checkpoint >>> intervals (why it’s 10 secs?) to reduce a latency. >>> >>> >>> [1] >>> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711 >>> >>> >>> >>> >>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko < >>> [email protected]> wrote: >>> >>>> I think you don’t need to enable EOS in this case since KafkaIO has a >>>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by >>>> all runners) and it relies on setting “enable.idempotence=true” for >>>> KafkaProducer. >>>> I’m not sure that you can achieve “at least once” semantics with >>>> current KafkaIO implementation. >>>> >>>> On 16 Jun 2020, at 17:16, Eleanore Jin <[email protected]> wrote: >>>> >>>> Hi All, >>>> >>>> I previously asked a few questions regarding enable EOS (exactly once >>>> semantics) please see below. >>>> >>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use >>>> KafkaIO to publish to sink topic. >>>> >>>> According to Max's answer to my previous questions, enable EOS with >>>> KafkaIO will introduce latency, >>>> as only after checkpoints of all message within the checkpoint >>>> interval, then the KakfaIO.ExactlyOnceWriter >>>> processElement method will be called. So the latency depends on the >>>> checkpoint interval. >>>> >>>> I just wonder if I relax to At Least Once, do I still need to enable >>>> EOS on KafkaIO? Or it is not required? >>>> If not, can you please provide some instruction how should it be done? >>>> >>>> Thanks a lot! >>>> Eleanore >>>> >>>> > Thanks for the response! the reason to setup the state backend is to >>>> > experiment Kafka EOS with Beam running on Flink. Reading through the >>>> > code and this PR <https://github.com/apache/beam/pull/7991/files>, >>>> can >>>> > you please help me clarify my understanding? >>>> > >>>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve >>>> > EOS, ExactlyOnceWriter processElement method is annotated >>>> > with @RequiresStableInput, so all the messages will be cached >>>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, >>>> those >>>> > messages will be processed by ExactlyOnceWriter? >>>> >>>> That's correct. >>>> >>>> > >>>> > 2. Upon checkpoint, will those messages cached by >>>> > KeyedBufferingEleementsHandler also checkpointed? >>>> >>>> Yes, the buffered elements will be checkpointed. >>>> >>>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in >>>> the >>>> > stream processing, the delay is based on the checkpoint interval? How >>>> to >>>> > reduce the latency while still have EOS guarantee? >>>> >>>> Indeed, the checkpoint interval and the checkpoint duration limits the >>>> latency. Given the current design and the guarantees, there is no other >>>> way to influence the latency. >>>> >>>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon >>>> > checkpoint successfully, the checkpointed offset will be committed >>>> back >>>> > to kafka, but if this operation does not finish successfully, and then >>>> > the job gets cancelled/stopped, and re-submit the job again (with the >>>> > same consumer group for source topics, but different jobID), then it >>>> is >>>> > possible duplicated processing still exists? because the consumed >>>> offset >>>> > is not committed back to kafka? >>>> >>>> This option is for the Kafka consumer. AFAIK this is just a convenience >>>> method to commit the latest checkpointed offset to Kafka. This offset is >>>> not used when restoring from a checkpoint. However, if you don't restore >>>> from a checkpoint, you can resume from that offset which might be >>>> convenient or not, depending on your use case. >>>> >>>> >>>> >>> >>
