On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský <[email protected]> wrote:
>
> Hi,
>
> the distribution of keys to workers might not be uniform, when the
> number of keys is comparable to total parallelism. General advise would be:
>
>   a) try to increase number of keys (EOS parallelism in this case) to be
> at least several times higher than parallelism

Make sense, unfortunately I faced an error when I tried to put the
shards > partitions. :(

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat

Do I need to move any configuration to do that?

Thanks

>
>   b) increase maxParallelism (default 128, maximum 32768), as it might
> influence the assignment of keys to downstream workers
>
> Best,
>
>   Jan
>
> On 6/21/24 05:25, Ruben Vargas wrote:
> > Image as not correctly attached. sending it again. Sorry
> >
> > Thanks
> >
> > On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas <[email protected]> 
> > wrote:
> >> Hello guys, me again
> >>
> >> I was trying to debug the issue with the  backpressure and I noticed
> >> that even if I set the shards = 16, not all tasks are receiving
> >> messages (attaching screenshot). You know potential causes and
> >> solutions?
> >>
> >> I really appreciate any help you can provide
> >>
> >>
> >> Thank you very much!
> >>
> >> Regards.
> >>
> >>
> >> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <[email protected]> 
> >> wrote:
> >>> Hello again
> >>>
> >>> Thank you for all the suggestions.
> >>>
> >>> Unfortunately if I put more shards than partitions it throws me this 
> >>> exception
> >>>
> >>> "message": 
> >>> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> >>> ids -> ToGBKResult ->
> >>> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> >>> to Kafka topic 
> >>> 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> >>> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> >>> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> >>> java.lang.RuntimeException:
> >>> java.lang.reflect.InvocationTargetException\n\tat
> >>> ..
> >>> ..
> >>> ..
> >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> >>> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> >>> 60000ms while awaiting AddOffsetsToTxn\n",
> >>>
> >>>
> >>> Any other alternative? Thank you very much!
> >>>
> >>> Regards
> >>>
> >>> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <[email protected]> wrote:
> >>>> Hi,
> >>>>
> >>>> regarding aligned vs unaligned checkpoints I recommend reading [1], it
> >>>> explains it quite well. Generally, I would prefer unaligned checkpoints
> >>>> in this case.
> >>>>
> >>>> Another thing to consider is the number of shards of the EOS sink.
> >>>> Because how the shards are distributed among workers, it might be good
> >>>> idea to actually increase that to some number higher than number of
> >>>> target partitions (e.g. targetPartitions * 10 or so). Additional thing
> >>>> to consider is increasing maxParallelism of the pipeline (e.g. max value
> >>>> is 32768), as it also affects how 'evenly' Flink assigns shards to
> >>>> workers. You can check if the assignment is even using counters in the
> >>>> sink operator(s).
> >>>>
> >>>>    Jan
> >>>>
> >>>> [1]
> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> >>>>
> >>>> On 6/19/24 05:15, Ruben Vargas wrote:
> >>>>> Hello guys
> >>>>>
> >>>>> Now I was able to pass that error.
> >>>>>
> >>>>> I had to set the consumer factory function
> >>>>> .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))
> >>>>>
> >>>>> This is because my cluster uses SASL authentication mechanism, and the
> >>>>> small consumer created to fetch the topics metadata was throwing that
> >>>>> error.
> >>>>>
> >>>>> There are other couple things I noticed:
> >>>>>
> >>>>>    - Now I have a lot of backpressure, I assigned x3 resources to the
> >>>>> cluster and even with that the back pressure is high . Any advice on
> >>>>> this? I already increased the shards to equal the number of partitions
> >>>>> of the destination topic.
> >>>>>
> >>>>> - I have an error where
> >>>>> "State exists for shard mytopic-0, but there is no state stored with
> >>>>> Kafka topic mytopic' group id myconsumergroup'
> >>>>>
> >>>>> The only way I found to recover from this error is to change the group
> >>>>> name. Any other advice on how to recover from this error?
> >>>>>
> >>>>>
> >>>>> Thank you very much for following this up!
> >>>>>
> >>>>> On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <[email protected]> 
> >>>>> wrote:
> >>>>>> Hello Jan
> >>>>>>
> >>>>>> Thanks for the suggestions
> >>>>>>
> >>>>>> Any benefit of using aligned vs unaligned?
> >>>>>>
> >>>>>>
> >>>>>> At the end I found one problem that was preventing  flink from doing
> >>>>>> the checkpointing. It was a DoFn function that has some "non
> >>>>>> serializable" objects, so I made those transient and initialized those
> >>>>>> on the setup.
> >>>>>>
> >>>>>> Weird, because I usually was able to detect these kinds of errors just
> >>>>>> running in the direct runner, or even in flink before enabling EOS.
> >>>>>>
> >>>>>>
> >>>>>> Now I'm facing another weird issue
> >>>>>>
> >>>>>> org.apache.beam.sdk.util.UserCodeException:
> >>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> >>>>>> expired before the last committed offset for partitions
> >>>>>> [behavioral-signals-6] could be determined. Try tuning
> >>>>>> default.api.timeout.ms larger to relax the threshold.
> >>>>>> at 
> >>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> >>>>>> at 
> >>>>>> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> >>>>>> Source)
> >>>>>> at 
> >>>>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> >>>>>>
> >>>>>> I tried to extend the timeout and it didn't work, my shards are equal
> >>>>>> to my number of partitions.
> >>>>>>
> >>>>>> I appreciate any kind of guidance
> >>>>>>
> >>>>>> Thanks.
> >>>>>>
> >>>>>> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <[email protected]> wrote:
> >>>>>>> I'd suggest:
> >>>>>>>    a) use unaligned checkpoints, if possible
> >>>>>>>
> >>>>>>>    b) verify the number of buckets you use for EOS sink, this limits 
> >>>>>>> parallelism [1].
> >>>>>>>
> >>>>>>> Best,
> >>>>>>>
> >>>>>>>    Jan
> >>>>>>>
> >>>>>>> [1] 
> >>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> >>>>>>>
> >>>>>>> On 6/18/24 09:32, Ruben Vargas wrote:
> >>>>>>>
> >>>>>>> Hello Lukavsky
> >>>>>>>
> >>>>>>> Thanks for your reply !
> >>>>>>>
> >>>>>>> I thought was due backpreassure but i increased the resources of the 
> >>>>>>> cluster and problem still presist. More that that, data stop flowing 
> >>>>>>> and the checkpoint still fail.
> >>>>>>>
> >>>>>>> I have configured the checkpoint to do it per minute. The timeout is 
> >>>>>>> 1h. Is aligned checkpoint.
> >>>>>>>
> >>>>>>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> >>>>>>> <[email protected]> escribió:
> >>>>>>>> H Ruben,
> >>>>>>>>
> >>>>>>>> from the provided screenshot it seems to me, that the pipeline in
> >>>>>>>> backpressured by the sink. Can you please share your checkpoint
> >>>>>>>> configuration? Are you using unaligned checkpoints? What is the
> >>>>>>>> checkpointing interval and the volume of data coming in from the 
> >>>>>>>> source?
> >>>>>>>> With EOS data is committed after checkpoint, before that, the data is
> >>>>>>>> buffered in state, which makes the sink more resource intensive.
> >>>>>>>>
> >>>>>>>>     Jan
> >>>>>>>>
> >>>>>>>> On 6/18/24 05:30, Ruben Vargas wrote:
> >>>>>>>>> Attached a better image of the console.
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>>
> >>>>>>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas 
> >>>>>>>>> <[email protected]> wrote:
> >>>>>>>>>> Hello guys
> >>>>>>>>>>
> >>>>>>>>>> Wondering if some of you have experiences enabling Exactly Once in
> >>>>>>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >>>>>>>>>> where all the checkpoints are failing. I cannot see any exception 
> >>>>>>>>>> on
> >>>>>>>>>> the logs.
> >>>>>>>>>>
> >>>>>>>>>> Flink console only mentions this "Asynchronous task checkpoint
> >>>>>>>>>> failed." I also noticed that some operators don't acknowledge the
> >>>>>>>>>> checkpointing  (Attached a screenshot).
> >>>>>>>>>>
> >>>>>>>>>> I did this:
> >>>>>>>>>>
> >>>>>>>>>> 1) KafkaIO.Read:
> >>>>>>>>>>
> >>>>>>>>>> update consumer properties with enable.auto.commit = false
> >>>>>>>>>> .withReadCommitted()
> >>>>>>>>>> .commitOffsetsInFinalize()
> >>>>>>>>>>
> >>>>>>>>>> 2) KafkaIO#write:
> >>>>>>>>>>
> >>>>>>>>>> .withEOS(numShards, sinkGroupId)
> >>>>>>>>>>
> >>>>>>>>>> But my application is not able to deliver messages to the output 
> >>>>>>>>>> topic
> >>>>>>>>>> due the checkpoint failing.
> >>>>>>>>>> I also reviewed the timeout and other time sensitive parameters, 
> >>>>>>>>>> those
> >>>>>>>>>> are high right now.
> >>>>>>>>>>
> >>>>>>>>>> I really appreciate your guidance on this. Thank you

Reply via email to