Hi Alexey,

Thanks a lot for the information! I will give it a try.

Regarding the checkpoint intervals, I think the Flink community suggested
something between 3-5 minutes, I am not sure yet if the checkpoint interval
can be in milliseconds? Currently, our beam pipeline is stateless, there is
no other operator state or user defined state.

Thanks a lot!
Eleanore

On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <[email protected]>
wrote:

>
> On 23 Jun 2020, at 07:49, Eleanore Jin <[email protected]> wrote:
>
> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
> "mySlinkGroupId"), reading from your response, do I need additionally
> configure KafkaProducer property enable.idempotence=true, or I only need to
> configure this property?
>
>
> No, you don’t need to do that. New KafkaProducer will be created with this
> option set in KafkaExactlyOnceSink [1].
>
> So can you please correct me if the above settings is not the optimal and
> if there is anyway to reduce the latency by introducing checkpointing for
> EOS?
>
>
> Your settings look fine for me. You probably could play with checkpoint
> intervals (why it’s 10 secs?) to reduce a latency.
>
>
> [1]
> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>
>
>
>
> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
> [email protected]> wrote:
>
>> I think you don’t need to enable EOS in this case since KafkaIO has a
>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>> all runners) and it relies on setting “enable.idempotence=true” for
>> KafkaProducer.
>> I’m not sure that you can achieve “at least once” semantics with current
>> KafkaIO implementation.
>>
>> On 16 Jun 2020, at 17:16, Eleanore Jin <[email protected]> wrote:
>>
>> Hi All,
>>
>> I previously asked a few questions regarding enable EOS (exactly once
>> semantics) please see below.
>>
>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>> KafkaIO to publish to sink topic.
>>
>> According to Max's answer to my previous questions, enable EOS with
>> KafkaIO will introduce latency,
>> as only after checkpoints of all message within the checkpoint interval,
>> then the KakfaIO.ExactlyOnceWriter
>> processElement method will be called. So the latency depends on the
>> checkpoint interval.
>>
>> I just wonder if I relax to At Least Once, do I still need to enable EOS
>> on KafkaIO? Or it is not required?
>> If not, can you please provide some instruction how should it be done?
>>
>> Thanks a lot!
>> Eleanore
>>
>> > Thanks for the response! the reason to setup the state backend is to
>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
>> > you please help me clarify my understanding?
>> >
>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>> > EOS, ExactlyOnceWriter processElement method is annotated
>> > with @RequiresStableInput, so all the messages will be cached
>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>> > messages will be processed by ExactlyOnceWriter?
>>
>> That's correct.
>>
>> >
>> > 2. Upon checkpoint, will those messages cached by
>> > KeyedBufferingEleementsHandler also checkpointed?
>>
>> Yes, the buffered elements will be checkpointed.
>>
>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>> > stream processing, the delay is based on the checkpoint interval? How to
>> > reduce the latency while still have EOS guarantee?
>>
>> Indeed, the checkpoint interval and the checkpoint duration limits the
>> latency. Given the current design and the guarantees, there is no other
>> way to influence the latency.
>>
>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>> > checkpoint successfully, the checkpointed offset will be committed back
>> > to kafka, but if this operation does not finish successfully, and then
>> > the job gets cancelled/stopped, and re-submit the job again (with the
>> > same consumer group for source topics, but different jobID), then it is
>> > possible duplicated processing still exists? because the consumed offset
>> > is not committed back to kafka?
>>
>> This option is for the Kafka consumer. AFAIK this is just a convenience
>> method to commit the latest checkpointed offset to Kafka. This offset is
>> not used when restoring from a checkpoint. However, if you don't restore
>> from a checkpoint, you can resume from that offset which might be
>> convenient or not, depending on your use case.
>>
>>
>>
>

Reply via email to