Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
60000ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <[email protected]> wrote:
>
> Hi,
>
> regarding aligned vs unaligned checkpoints I recommend reading [1], it
> explains it quite well. Generally, I would prefer unaligned checkpoints
> in this case.
>
> Another thing to consider is the number of shards of the EOS sink.
> Because how the shards are distributed among workers, it might be good
> idea to actually increase that to some number higher than number of
> target partitions (e.g. targetPartitions * 10 or so). Additional thing
> to consider is increasing maxParallelism of the pipeline (e.g. max value
> is 32768), as it also affects how 'evenly' Flink assigns shards to
> workers. You can check if the assignment is even using counters in the
> sink operator(s).
>
>   Jan
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
>
> On 6/19/24 05:15, Ruben Vargas wrote:
> > Hello guys
> >
> > Now I was able to pass that error.
> >
> > I had to set the consumer factory function
> > .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))
> >
> > This is because my cluster uses SASL authentication mechanism, and the
> > small consumer created to fetch the topics metadata was throwing that
> > error.
> >
> > There are other couple things I noticed:
> >
> >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > cluster and even with that the back pressure is high . Any advice on
> > this? I already increased the shards to equal the number of partitions
> > of the destination topic.
> >
> > - I have an error where
> > "State exists for shard mytopic-0, but there is no state stored with
> > Kafka topic mytopic' group id myconsumergroup'
> >
> > The only way I found to recover from this error is to change the group
> > name. Any other advice on how to recover from this error?
> >
> >
> > Thank you very much for following this up!
> >
> > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <[email protected]> 
> > wrote:
> >> Hello Jan
> >>
> >> Thanks for the suggestions
> >>
> >> Any benefit of using aligned vs unaligned?
> >>
> >>
> >> At the end I found one problem that was preventing  flink from doing
> >> the checkpointing. It was a DoFn function that has some "non
> >> serializable" objects, so I made those transient and initialized those
> >> on the setup.
> >>
> >> Weird, because I usually was able to detect these kinds of errors just
> >> running in the direct runner, or even in flink before enabling EOS.
> >>
> >>
> >> Now I'm facing another weird issue
> >>
> >> org.apache.beam.sdk.util.UserCodeException:
> >> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> >> expired before the last committed offset for partitions
> >> [behavioral-signals-6] could be determined. Try tuning
> >> default.api.timeout.ms larger to relax the threshold.
> >> at 
> >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> >> at 
> >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> >> Source)
> >> at 
> >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> >>
> >> I tried to extend the timeout and it didn't work, my shards are equal
> >> to my number of partitions.
> >>
> >> I appreciate any kind of guidance
> >>
> >> Thanks.
> >>
> >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <[email protected]> wrote:
> >>> I'd suggest:
> >>>   a) use unaligned checkpoints, if possible
> >>>
> >>>   b) verify the number of buckets you use for EOS sink, this limits 
> >>> parallelism [1].
> >>>
> >>> Best,
> >>>
> >>>   Jan
> >>>
> >>> [1] 
> >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> >>>
> >>> On 6/18/24 09:32, Ruben Vargas wrote:
> >>>
> >>> Hello Lukavsky
> >>>
> >>> Thanks for your reply !
> >>>
> >>> I thought was due backpreassure but i increased the resources of the 
> >>> cluster and problem still presist. More that that, data stop flowing and 
> >>> the checkpoint still fail.
> >>>
> >>> I have configured the checkpoint to do it per minute. The timeout is 1h. 
> >>> Is aligned checkpoint.
> >>>
> >>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> >>> <[email protected]> escribió:
> >>>> H Ruben,
> >>>>
> >>>> from the provided screenshot it seems to me, that the pipeline in
> >>>> backpressured by the sink. Can you please share your checkpoint
> >>>> configuration? Are you using unaligned checkpoints? What is the
> >>>> checkpointing interval and the volume of data coming in from the source?
> >>>> With EOS data is committed after checkpoint, before that, the data is
> >>>> buffered in state, which makes the sink more resource intensive.
> >>>>
> >>>>    Jan
> >>>>
> >>>> On 6/18/24 05:30, Ruben Vargas wrote:
> >>>>> Attached a better image of the console.
> >>>>>
> >>>>> Thanks!
> >>>>>
> >>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <[email protected]> 
> >>>>> wrote:
> >>>>>> Hello guys
> >>>>>>
> >>>>>> Wondering if some of you have experiences enabling Exactly Once in
> >>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >>>>>> where all the checkpoints are failing. I cannot see any exception on
> >>>>>> the logs.
> >>>>>>
> >>>>>> Flink console only mentions this "Asynchronous task checkpoint
> >>>>>> failed." I also noticed that some operators don't acknowledge the
> >>>>>> checkpointing  (Attached a screenshot).
> >>>>>>
> >>>>>> I did this:
> >>>>>>
> >>>>>> 1) KafkaIO.Read:
> >>>>>>
> >>>>>> update consumer properties with enable.auto.commit = false
> >>>>>> .withReadCommitted()
> >>>>>> .commitOffsetsInFinalize()
> >>>>>>
> >>>>>> 2) KafkaIO#write:
> >>>>>>
> >>>>>> .withEOS(numShards, sinkGroupId)
> >>>>>>
> >>>>>> But my application is not able to deliver messages to the output topic
> >>>>>> due the checkpoint failing.
> >>>>>> I also reviewed the timeout and other time sensitive parameters, those
> >>>>>> are high right now.
> >>>>>>
> >>>>>> I really appreciate your guidance on this. Thank you

Reply via email to