On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský <[email protected]> wrote: > > Hi, > > the distribution of keys to workers might not be uniform, when the > number of keys is comparable to total parallelism. General advise would be: > > a) try to increase number of keys (EOS parallelism in this case) to be > at least several times higher than parallelism
Make sense, unfortunately I faced an error when I tried to put the shards > partitions. :( "message": "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist ids -> ToGBKResult -> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter) (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException\n\tat Do I need to move any configuration to do that? Thanks > > b) increase maxParallelism (default 128, maximum 32768), as it might > influence the assignment of keys to downstream workers > > Best, > > Jan > > On 6/21/24 05:25, Ruben Vargas wrote: > > Image as not correctly attached. sending it again. Sorry > > > > Thanks > > > > On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas <[email protected]> > > wrote: > >> Hello guys, me again > >> > >> I was trying to debug the issue with the backpressure and I noticed > >> that even if I set the shards = 16, not all tasks are receiving > >> messages (attaching screenshot). You know potential causes and > >> solutions? > >> > >> I really appreciate any help you can provide > >> > >> > >> Thank you very much! > >> > >> Regards. > >> > >> > >> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <[email protected]> > >> wrote: > >>> Hello again > >>> > >>> Thank you for all the suggestions. > >>> > >>> Unfortunately if I put more shards than partitions it throws me this > >>> exception > >>> > >>> "message": > >>> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist > >>> ids -> ToGBKResult -> > >>> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write > >>> to Kafka topic > >>> 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter) > >>> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to > >>> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException: > >>> java.lang.RuntimeException: > >>> java.lang.reflect.InvocationTargetException\n\tat > >>> .. > >>> .. > >>> .. > >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat > >>> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: > >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after > >>> 60000ms while awaiting AddOffsetsToTxn\n", > >>> > >>> > >>> Any other alternative? Thank you very much! > >>> > >>> Regards > >>> > >>> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <[email protected]> wrote: > >>>> Hi, > >>>> > >>>> regarding aligned vs unaligned checkpoints I recommend reading [1], it > >>>> explains it quite well. Generally, I would prefer unaligned checkpoints > >>>> in this case. > >>>> > >>>> Another thing to consider is the number of shards of the EOS sink. > >>>> Because how the shards are distributed among workers, it might be good > >>>> idea to actually increase that to some number higher than number of > >>>> target partitions (e.g. targetPartitions * 10 or so). Additional thing > >>>> to consider is increasing maxParallelism of the pipeline (e.g. max value > >>>> is 32768), as it also affects how 'evenly' Flink assigns shards to > >>>> workers. You can check if the assignment is even using counters in the > >>>> sink operator(s). > >>>> > >>>> Jan > >>>> > >>>> [1] > >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/ > >>>> > >>>> On 6/19/24 05:15, Ruben Vargas wrote: > >>>>> Hello guys > >>>>> > >>>>> Now I was able to pass that error. > >>>>> > >>>>> I had to set the consumer factory function > >>>>> .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config)) > >>>>> > >>>>> This is because my cluster uses SASL authentication mechanism, and the > >>>>> small consumer created to fetch the topics metadata was throwing that > >>>>> error. > >>>>> > >>>>> There are other couple things I noticed: > >>>>> > >>>>> - Now I have a lot of backpressure, I assigned x3 resources to the > >>>>> cluster and even with that the back pressure is high . Any advice on > >>>>> this? I already increased the shards to equal the number of partitions > >>>>> of the destination topic. > >>>>> > >>>>> - I have an error where > >>>>> "State exists for shard mytopic-0, but there is no state stored with > >>>>> Kafka topic mytopic' group id myconsumergroup' > >>>>> > >>>>> The only way I found to recover from this error is to change the group > >>>>> name. Any other advice on how to recover from this error? > >>>>> > >>>>> > >>>>> Thank you very much for following this up! > >>>>> > >>>>> On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <[email protected]> > >>>>> wrote: > >>>>>> Hello Jan > >>>>>> > >>>>>> Thanks for the suggestions > >>>>>> > >>>>>> Any benefit of using aligned vs unaligned? > >>>>>> > >>>>>> > >>>>>> At the end I found one problem that was preventing flink from doing > >>>>>> the checkpointing. It was a DoFn function that has some "non > >>>>>> serializable" objects, so I made those transient and initialized those > >>>>>> on the setup. > >>>>>> > >>>>>> Weird, because I usually was able to detect these kinds of errors just > >>>>>> running in the direct runner, or even in flink before enabling EOS. > >>>>>> > >>>>>> > >>>>>> Now I'm facing another weird issue > >>>>>> > >>>>>> org.apache.beam.sdk.util.UserCodeException: > >>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > >>>>>> expired before the last committed offset for partitions > >>>>>> [behavioral-signals-6] could be determined. Try tuning > >>>>>> default.api.timeout.ms larger to relax the threshold. > >>>>>> at > >>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > >>>>>> at > >>>>>> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown > >>>>>> Source) > >>>>>> at > >>>>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) > >>>>>> > >>>>>> I tried to extend the timeout and it didn't work, my shards are equal > >>>>>> to my number of partitions. > >>>>>> > >>>>>> I appreciate any kind of guidance > >>>>>> > >>>>>> Thanks. > >>>>>> > >>>>>> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <[email protected]> wrote: > >>>>>>> I'd suggest: > >>>>>>> a) use unaligned checkpoints, if possible > >>>>>>> > >>>>>>> b) verify the number of buckets you use for EOS sink, this limits > >>>>>>> parallelism [1]. > >>>>>>> > >>>>>>> Best, > >>>>>>> > >>>>>>> Jan > >>>>>>> > >>>>>>> [1] > >>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String- > >>>>>>> > >>>>>>> On 6/18/24 09:32, Ruben Vargas wrote: > >>>>>>> > >>>>>>> Hello Lukavsky > >>>>>>> > >>>>>>> Thanks for your reply ! > >>>>>>> > >>>>>>> I thought was due backpreassure but i increased the resources of the > >>>>>>> cluster and problem still presist. More that that, data stop flowing > >>>>>>> and the checkpoint still fail. > >>>>>>> > >>>>>>> I have configured the checkpoint to do it per minute. The timeout is > >>>>>>> 1h. Is aligned checkpoint. > >>>>>>> > >>>>>>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský > >>>>>>> <[email protected]> escribió: > >>>>>>>> H Ruben, > >>>>>>>> > >>>>>>>> from the provided screenshot it seems to me, that the pipeline in > >>>>>>>> backpressured by the sink. Can you please share your checkpoint > >>>>>>>> configuration? Are you using unaligned checkpoints? What is the > >>>>>>>> checkpointing interval and the volume of data coming in from the > >>>>>>>> source? > >>>>>>>> With EOS data is committed after checkpoint, before that, the data is > >>>>>>>> buffered in state, which makes the sink more resource intensive. > >>>>>>>> > >>>>>>>> Jan > >>>>>>>> > >>>>>>>> On 6/18/24 05:30, Ruben Vargas wrote: > >>>>>>>>> Attached a better image of the console. > >>>>>>>>> > >>>>>>>>> Thanks! > >>>>>>>>> > >>>>>>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>>> Hello guys > >>>>>>>>>> > >>>>>>>>>> Wondering if some of you have experiences enabling Exactly Once in > >>>>>>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue > >>>>>>>>>> where all the checkpoints are failing. I cannot see any exception > >>>>>>>>>> on > >>>>>>>>>> the logs. > >>>>>>>>>> > >>>>>>>>>> Flink console only mentions this "Asynchronous task checkpoint > >>>>>>>>>> failed." I also noticed that some operators don't acknowledge the > >>>>>>>>>> checkpointing (Attached a screenshot). > >>>>>>>>>> > >>>>>>>>>> I did this: > >>>>>>>>>> > >>>>>>>>>> 1) KafkaIO.Read: > >>>>>>>>>> > >>>>>>>>>> update consumer properties with enable.auto.commit = false > >>>>>>>>>> .withReadCommitted() > >>>>>>>>>> .commitOffsetsInFinalize() > >>>>>>>>>> > >>>>>>>>>> 2) KafkaIO#write: > >>>>>>>>>> > >>>>>>>>>> .withEOS(numShards, sinkGroupId) > >>>>>>>>>> > >>>>>>>>>> But my application is not able to deliver messages to the output > >>>>>>>>>> topic > >>>>>>>>>> due the checkpoint failing. > >>>>>>>>>> I also reviewed the timeout and other time sensitive parameters, > >>>>>>>>>> those > >>>>>>>>>> are high right now. > >>>>>>>>>> > >>>>>>>>>> I really appreciate your guidance on this. Thank you
