Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <[email protected]> wrote:
>
> I'd suggest:
>  a) use unaligned checkpoints, if possible
>
>  b) verify the number of buckets you use for EOS sink, this limits 
> parallelism [1].
>
> Best,
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
>
> On 6/18/24 09:32, Ruben Vargas wrote:
>
> Hello Lukavsky
>
> Thanks for your reply !
>
> I thought was due backpreassure but i increased the resources of the cluster 
> and problem still presist. More that that, data stop flowing and the 
> checkpoint still fail.
>
> I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
> aligned checkpoint.
>
> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> <[email protected]> escribió:
>>
>> H Ruben,
>>
>> from the provided screenshot it seems to me, that the pipeline in
>> backpressured by the sink. Can you please share your checkpoint
>> configuration? Are you using unaligned checkpoints? What is the
>> checkpointing interval and the volume of data coming in from the source?
>> With EOS data is committed after checkpoint, before that, the data is
>> buffered in state, which makes the sink more resource intensive.
>>
>>   Jan
>>
>> On 6/18/24 05:30, Ruben Vargas wrote:
>> > Attached a better image of the console.
>> >
>> > Thanks!
>> >
>> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <[email protected]> 
>> > wrote:
>> >> Hello guys
>> >>
>> >> Wondering if some of you have experiences enabling Exactly Once in
>> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
>> >> where all the checkpoints are failing. I cannot see any exception on
>> >> the logs.
>> >>
>> >> Flink console only mentions this "Asynchronous task checkpoint
>> >> failed." I also noticed that some operators don't acknowledge the
>> >> checkpointing  (Attached a screenshot).
>> >>
>> >> I did this:
>> >>
>> >> 1) KafkaIO.Read:
>> >>
>> >> update consumer properties with enable.auto.commit = false
>> >> .withReadCommitted()
>> >> .commitOffsetsInFinalize()
>> >>
>> >> 2) KafkaIO#write:
>> >>
>> >> .withEOS(numShards, sinkGroupId)
>> >>
>> >> But my application is not able to deliver messages to the output topic
>> >> due the checkpoint failing.
>> >> I also reviewed the timeout and other time sensitive parameters, those
>> >> are high right now.
>> >>
>> >> I really appreciate your guidance on this. Thank you

Reply via email to