Hello Jan Thanks for the suggestions
Any benefit of using aligned vs unaligned? At the end I found one problem that was preventing flink from doing the checkpointing. It was a DoFn function that has some "non serializable" objects, so I made those transient and initialized those on the setup. Weird, because I usually was able to detect these kinds of errors just running in the direct runner, or even in flink before enabling EOS. Now I'm facing another weird issue org.apache.beam.sdk.util.UserCodeException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the last committed offset for partitions [behavioral-signals-6] could be determined. Try tuning default.api.timeout.ms larger to relax the threshold. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) I tried to extend the timeout and it didn't work, my shards are equal to my number of partitions. I appreciate any kind of guidance Thanks. On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <[email protected]> wrote: > > I'd suggest: > a) use unaligned checkpoints, if possible > > b) verify the number of buckets you use for EOS sink, this limits > parallelism [1]. > > Best, > > Jan > > [1] > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String- > > On 6/18/24 09:32, Ruben Vargas wrote: > > Hello Lukavsky > > Thanks for your reply ! > > I thought was due backpreassure but i increased the resources of the cluster > and problem still presist. More that that, data stop flowing and the > checkpoint still fail. > > I have configured the checkpoint to do it per minute. The timeout is 1h. Is > aligned checkpoint. > > El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský > <[email protected]> escribió: >> >> H Ruben, >> >> from the provided screenshot it seems to me, that the pipeline in >> backpressured by the sink. Can you please share your checkpoint >> configuration? Are you using unaligned checkpoints? What is the >> checkpointing interval and the volume of data coming in from the source? >> With EOS data is committed after checkpoint, before that, the data is >> buffered in state, which makes the sink more resource intensive. >> >> Jan >> >> On 6/18/24 05:30, Ruben Vargas wrote: >> > Attached a better image of the console. >> > >> > Thanks! >> > >> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <[email protected]> >> > wrote: >> >> Hello guys >> >> >> >> Wondering if some of you have experiences enabling Exactly Once in >> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue >> >> where all the checkpoints are failing. I cannot see any exception on >> >> the logs. >> >> >> >> Flink console only mentions this "Asynchronous task checkpoint >> >> failed." I also noticed that some operators don't acknowledge the >> >> checkpointing (Attached a screenshot). >> >> >> >> I did this: >> >> >> >> 1) KafkaIO.Read: >> >> >> >> update consumer properties with enable.auto.commit = false >> >> .withReadCommitted() >> >> .commitOffsetsInFinalize() >> >> >> >> 2) KafkaIO#write: >> >> >> >> .withEOS(numShards, sinkGroupId) >> >> >> >> But my application is not able to deliver messages to the output topic >> >> due the checkpoint failing. >> >> I also reviewed the timeout and other time sensitive parameters, those >> >> are high right now. >> >> >> >> I really appreciate your guidance on this. Thank you
