Hi Max,

No I did not introduce RocksDB at this point since the pipeline is
stateless apart from Kafka offset.

So what we do is to ensure there is a dummy message in the side input to
avoid this situation.

Thanks!
Eleanore

On Mon, May 11, 2020 at 2:57 AM Maximilian Michels <[email protected]> wrote:

> Generally, it is to be expected that the main input is buffered until
> the side input is available. We really have no other option to correctly
> process the data.
>
> Have you tried using RocksDB as the state backend to prevent too much GC
> churn?
>
> -Max
>
> On 07.05.20 06:27, Eleanore Jin wrote:
> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     Thanks for sharing the response. It makes sense to me.
> >     Please file a jira in Beam so that we can prioritize it.
> >
> >     Thanks,
> >     Ankur
> >
> >     On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <[email protected]
> >     <mailto:[email protected]>> wrote:
> >
> >         Hi Ankur,
> >
> >         Thanks for your response.
> >
> >         I also checked with Flink Community, here is there response, in
> >         short, flink does not cache the main input data if there is no
> >         data available in side input  (flink broadcast stream)
> >
> >         - quote from flink community:
> >
> >         Coming back to your question, Flink's Broadcast stream does
> >         *not* block or collect events from the non-broadcasted side if
> >         the broadcast side doesn't serve events.
> >         However, the user-implemented operators (Beam or your code in
> >         this case) often puts non-broadcasted events into state to wait
> >         for input from the other side.
> >         Since the error is not about lack of memory, the buffering in
> >         Flink state might not be the problem here.
> >
> >         Thanks a lot for the help!
> >         Eleanore
> >
> >         On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <[email protected]
> >         <mailto:[email protected]>> wrote:
> >
> >             The relevant code should bere
> >             here
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>
> >
> >             Given that the problem goes away after publishing Side input
> >             suggests that this might be a problem with synchronizing 2
> >             streams of data on Flink using Beam.
> >
> >             I am not sure if flink optimizer waits for site input to be
> >             available before processing the main input. We might
> >             potentially handle this on the Beam side as well or use a
> >             different set of flink apis to let us do better optimization
> >             if possible. In any case this would require a new sdk
> >             release if we decide to fix.
> >
> >             On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
> >             <[email protected] <mailto:[email protected]>>
> wrote:
> >
> >                 Hi Ankur,
> >
> >                 Thanks for the answer! Can you please point to me the
> >                 source code where the buffering is? I would like to
> >                 learn how beam works, thanks!
> >
> >                 To your question, in my case, side input does not have
> >                 any data, meaning no one publishing to the side input
> >                 topic.
> >
> >                 After publishing some data into the side input topic,
> >                 the OOM goes away.
> >
> >                 Thanks!
> >                 Eleanore
> >
> >                 On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
> >                 <[email protected] <mailto:[email protected]>> wrote:
> >
> >                     Hi Eleanore,
> >
> >                     The operation requires buffering the data till the
> >                     data from side input is not available. Which might
> >                     be causing the OOM issue.
> >                     You mention that OOM happens when there is no data
> >                     in side input. Does it mean that the side input is
> >                     not yet ready or does side input have no data at all?
> >
> >                     Thanks,
> >                     Ankur
> >
> >                     On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
> >                     <[email protected] <mailto:[email protected]>>
> wrote:
> >
> >                         +Ankur Goenka <mailto:[email protected]> by any
> >                         chance do you know what could be causing this?
> >
> >                         Thanks Eleanore for the detailed debugging : )
> >
> >                         On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
> >                         <[email protected]
> >                         <mailto:[email protected]>> wrote:
> >
> >                             Hi Community,
> >
> >                             Just wonder does side input feature buffer
> >                             the messages from main source if there is no
> >                             data available from side input?
> >
> >                             Thanks!
> >                             Eleanore
> >
> >                             On Sat, May 2, 2020 at 6:28 PM Eleanore Jin
> >                             <[email protected]
> >                             <mailto:[email protected]>> wrote:
> >
> >                                 After some more experience, I observed
> >                                 following:
> >                                 1. run pipeline without sideinput: no
> >                                 OOM issue
> >                                 2. run pipeline with sideinput (kafka
> >                                 topic with 1 partition) with data
> >                                 available from this side input: no OOM
> issue
> >                                 3. run pipeline with sideinput (kafka
> >                                 topic with 1 partition) without any data
> >                                 from the side input: */OOM issue/*
> >                                 */
> >                                 /*
> >                                 So I just wonder what is the behaviour
> >                                 if there is no data from the side input?
> >                                 does it cache the data from main stream?
> >                                 If so, is there a way to allow
> >                                 processing mainstream data without
> >                                 waiting for data from side input?
> >
> >                                 Thanks a lot!
> >                                 Eleanore
> >
> >                                 On Sat, May 2, 2020 at 1:04 PM Eleanore
> >                                 Jin <[email protected]
> >                                 <mailto:[email protected]>> wrote:
> >
> >                                     Hi Community,
> >
> >                                     I am running beam(2.16)  with flink
> >                                     (1.8.2), in my pipeline there is a
> >                                     sideinput which reads from a compact
> >                                     kafka topic from earliest, and the
> >                                     sideinput value is used for
> >                                     filtering. I keeps on getting the
> >                                     OOM: GC overhead limit exceeded.
> >
> >                                     The side input method looks like
> below.
> >
> >                                     private <T> PCollection<KV<String,
> T>> createSideCollection(String topicName,
> >                                       Class<? extends Deserializer<T>>
> deserializerClass) {
> >
> >                                       Map<String, Object>
> consumerProperties = ImmutableMap.<String, Object>builder()
> >
>  .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> >
>  .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
> >                                         .build();
> >
> >                                       PCollection<KV<String, T>>
> kafkaValues = pipeline.apply("collectionFromTopic-" + topicName,
> >                                         KafkaIO.<String, T>read()
> >
>  .withBootstrapServers(kafkaSettings.getBootstrapServers())
> >
>  .withTopics(Collections.singletonList(topicName))
> >
>  .withKeyDeserializer(KeyDeserializer.class)
> >
>  .withValueDeserializer(deserializerClass)
> >
>  .withConsumerConfigUpdates(consumerProperties)
> >                                           .withoutMetadata());
> >
> >                                       Trigger trigger =
> Repeatedly.forever(
> >                                         AfterFirst.of(
> >
>  AfterPane.elementCountAtLeast(1),
> >
>  AfterProcessingTime.pastFirstElementInPane()
> >                                         ));
> >
> >                                       return
> kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
> >                                         .triggering(trigger)
> >                                         .accumulatingFiredPanes()
> >
>  .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
> >                                       );
> >                                     }
> >
> >                                     we run 2 flink task managers each
> with 4 slots. I think the problem lies in
> >
> >                                     the sideinput after looking into the
> heap dump. Also double confirmed disable
> >
> >                                     the sideinput, the pipeline runs
> fine.
> >
> >                                     Can you please provide some help?
> >
> >                                     Thanks a lot!
> >
> >                                     Eleanore
> >
> >                                     image.png
> >
>

Reply via email to