Hello again Thank you for all the suggestions.
Unfortunately if I put more shards than partitions it throws me this exception "message": "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist ids -> ToGBKResult -> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter) (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException\n\tat .. .. .. org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting AddOffsetsToTxn\n", Any other alternative? Thank you very much! Regards On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <[email protected]> wrote: > > Hi, > > regarding aligned vs unaligned checkpoints I recommend reading [1], it > explains it quite well. Generally, I would prefer unaligned checkpoints > in this case. > > Another thing to consider is the number of shards of the EOS sink. > Because how the shards are distributed among workers, it might be good > idea to actually increase that to some number higher than number of > target partitions (e.g. targetPartitions * 10 or so). Additional thing > to consider is increasing maxParallelism of the pipeline (e.g. max value > is 32768), as it also affects how 'evenly' Flink assigns shards to > workers. You can check if the assignment is even using counters in the > sink operator(s). > > Jan > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/ > > On 6/19/24 05:15, Ruben Vargas wrote: > > Hello guys > > > > Now I was able to pass that error. > > > > I had to set the consumer factory function > > .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config)) > > > > This is because my cluster uses SASL authentication mechanism, and the > > small consumer created to fetch the topics metadata was throwing that > > error. > > > > There are other couple things I noticed: > > > > - Now I have a lot of backpressure, I assigned x3 resources to the > > cluster and even with that the back pressure is high . Any advice on > > this? I already increased the shards to equal the number of partitions > > of the destination topic. > > > > - I have an error where > > "State exists for shard mytopic-0, but there is no state stored with > > Kafka topic mytopic' group id myconsumergroup' > > > > The only way I found to recover from this error is to change the group > > name. Any other advice on how to recover from this error? > > > > > > Thank you very much for following this up! > > > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <[email protected]> > > wrote: > >> Hello Jan > >> > >> Thanks for the suggestions > >> > >> Any benefit of using aligned vs unaligned? > >> > >> > >> At the end I found one problem that was preventing flink from doing > >> the checkpointing. It was a DoFn function that has some "non > >> serializable" objects, so I made those transient and initialized those > >> on the setup. > >> > >> Weird, because I usually was able to detect these kinds of errors just > >> running in the direct runner, or even in flink before enabling EOS. > >> > >> > >> Now I'm facing another weird issue > >> > >> org.apache.beam.sdk.util.UserCodeException: > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > >> expired before the last committed offset for partitions > >> [behavioral-signals-6] could be determined. Try tuning > >> default.api.timeout.ms larger to relax the threshold. > >> at > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > >> at > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown > >> Source) > >> at > >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) > >> > >> I tried to extend the timeout and it didn't work, my shards are equal > >> to my number of partitions. > >> > >> I appreciate any kind of guidance > >> > >> Thanks. > >> > >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <[email protected]> wrote: > >>> I'd suggest: > >>> a) use unaligned checkpoints, if possible > >>> > >>> b) verify the number of buckets you use for EOS sink, this limits > >>> parallelism [1]. > >>> > >>> Best, > >>> > >>> Jan > >>> > >>> [1] > >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String- > >>> > >>> On 6/18/24 09:32, Ruben Vargas wrote: > >>> > >>> Hello Lukavsky > >>> > >>> Thanks for your reply ! > >>> > >>> I thought was due backpreassure but i increased the resources of the > >>> cluster and problem still presist. More that that, data stop flowing and > >>> the checkpoint still fail. > >>> > >>> I have configured the checkpoint to do it per minute. The timeout is 1h. > >>> Is aligned checkpoint. > >>> > >>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský > >>> <[email protected]> escribió: > >>>> H Ruben, > >>>> > >>>> from the provided screenshot it seems to me, that the pipeline in > >>>> backpressured by the sink. Can you please share your checkpoint > >>>> configuration? Are you using unaligned checkpoints? What is the > >>>> checkpointing interval and the volume of data coming in from the source? > >>>> With EOS data is committed after checkpoint, before that, the data is > >>>> buffered in state, which makes the sink more resource intensive. > >>>> > >>>> Jan > >>>> > >>>> On 6/18/24 05:30, Ruben Vargas wrote: > >>>>> Attached a better image of the console. > >>>>> > >>>>> Thanks! > >>>>> > >>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <[email protected]> > >>>>> wrote: > >>>>>> Hello guys > >>>>>> > >>>>>> Wondering if some of you have experiences enabling Exactly Once in > >>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue > >>>>>> where all the checkpoints are failing. I cannot see any exception on > >>>>>> the logs. > >>>>>> > >>>>>> Flink console only mentions this "Asynchronous task checkpoint > >>>>>> failed." I also noticed that some operators don't acknowledge the > >>>>>> checkpointing (Attached a screenshot). > >>>>>> > >>>>>> I did this: > >>>>>> > >>>>>> 1) KafkaIO.Read: > >>>>>> > >>>>>> update consumer properties with enable.auto.commit = false > >>>>>> .withReadCommitted() > >>>>>> .commitOffsetsInFinalize() > >>>>>> > >>>>>> 2) KafkaIO#write: > >>>>>> > >>>>>> .withEOS(numShards, sinkGroupId) > >>>>>> > >>>>>> But my application is not able to deliver messages to the output topic > >>>>>> due the checkpoint failing. > >>>>>> I also reviewed the timeout and other time sensitive parameters, those > >>>>>> are high right now. > >>>>>> > >>>>>> I really appreciate your guidance on this. Thank you
