Generally, it is to be expected that the main input is buffered until
the side input is available. We really have no other option to correctly
process the data.

Have you tried using RocksDB as the state backend to prevent too much GC
churn?

-Max

On 07.05.20 06:27, Eleanore Jin wrote:
> Please see: https://issues.apache.org/jira/browse/BEAM-9914
> 
> Thanks a lot!
> Eleanore
> 
> On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <goe...@google.com
> <mailto:goe...@google.com>> wrote:
> 
>     Thanks for sharing the response. It makes sense to me.
>     Please file a jira in Beam so that we can prioritize it.
> 
>     Thanks,
>     Ankur
> 
>     On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <eleanore....@gmail.com
>     <mailto:eleanore....@gmail.com>> wrote:
> 
>         Hi Ankur, 
> 
>         Thanks for your response. 
> 
>         I also checked with Flink Community, here is there response, in
>         short, flink does not cache the main input data if there is no
>         data available in side input  (flink broadcast stream)
> 
>         - quote from flink community:
> 
>         Coming back to your question, Flink's Broadcast stream does
>         *not* block or collect events from the non-broadcasted side if
>         the broadcast side doesn't serve events.
>         However, the user-implemented operators (Beam or your code in
>         this case) often puts non-broadcasted events into state to wait
>         for input from the other side.
>         Since the error is not about lack of memory, the buffering in
>         Flink state might not be the problem here.
> 
>         Thanks a lot for the help!
>         Eleanore
> 
>         On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <goe...@google.com
>         <mailto:goe...@google.com>> wrote:
> 
>             The relevant code should bere
>             here 
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>  
> 
>             Given that the problem goes away after publishing Side input
>             suggests that this might be a problem with synchronizing 2
>             streams of data on Flink using Beam.
> 
>             I am not sure if flink optimizer waits for site input to be
>             available before processing the main input. We might
>             potentially handle this on the Beam side as well or use a
>             different set of flink apis to let us do better optimization
>             if possible. In any case this would require a new sdk
>             release if we decide to fix.
> 
>             On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
>             <eleanore....@gmail.com <mailto:eleanore....@gmail.com>> wrote:
> 
>                 Hi Ankur, 
> 
>                 Thanks for the answer! Can you please point to me the
>                 source code where the buffering is? I would like to
>                 learn how beam works, thanks! 
> 
>                 To your question, in my case, side input does not have
>                 any data, meaning no one publishing to the side input
>                 topic. 
> 
>                 After publishing some data into the side input topic,
>                 the OOM goes away. 
> 
>                 Thanks! 
>                 Eleanore 
> 
>                 On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
>                 <goe...@google.com <mailto:goe...@google.com>> wrote:
> 
>                     Hi Eleanore, 
> 
>                     The operation requires buffering the data till the
>                     data from side input is not available. Which might
>                     be causing the OOM issue.
>                     You mention that OOM happens when there is no data
>                     in side input. Does it mean that the side input is
>                     not yet ready or does side input have no data at all?
> 
>                     Thanks,
>                     Ankur
> 
>                     On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
>                     <pabl...@google.com <mailto:pabl...@google.com>> wrote:
> 
>                         +Ankur Goenka <mailto:goe...@google.com> by any
>                         chance do you know what could be causing this?
> 
>                         Thanks Eleanore for the detailed debugging : )
> 
>                         On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
>                         <eleanore....@gmail.com
>                         <mailto:eleanore....@gmail.com>> wrote:
> 
>                             Hi Community, 
> 
>                             Just wonder does side input feature buffer
>                             the messages from main source if there is no
>                             data available from side input?
> 
>                             Thanks!
>                             Eleanore
> 
>                             On Sat, May 2, 2020 at 6:28 PM Eleanore Jin
>                             <eleanore....@gmail.com
>                             <mailto:eleanore....@gmail.com>> wrote:
> 
>                                 After some more experience, I observed
>                                 following:
>                                 1. run pipeline without sideinput: no
>                                 OOM issue
>                                 2. run pipeline with sideinput (kafka
>                                 topic with 1 partition) with data
>                                 available from this side input: no OOM issue
>                                 3. run pipeline with sideinput (kafka
>                                 topic with 1 partition) without any data
>                                 from the side input: */OOM issue/*
>                                 */
>                                 /*
>                                 So I just wonder what is the behaviour
>                                 if there is no data from the side input?
>                                 does it cache the data from main stream?
>                                 If so, is there a way to allow
>                                 processing mainstream data without
>                                 waiting for data from side input? 
> 
>                                 Thanks a lot!
>                                 Eleanore
> 
>                                 On Sat, May 2, 2020 at 1:04 PM Eleanore
>                                 Jin <eleanore....@gmail.com
>                                 <mailto:eleanore....@gmail.com>> wrote:
> 
>                                     Hi Community, 
> 
>                                     I am running beam(2.16)  with flink
>                                     (1.8.2), in my pipeline there is a
>                                     sideinput which reads from a compact
>                                     kafka topic from earliest, and the
>                                     sideinput value is used for
>                                     filtering. I keeps on getting the
>                                     OOM: GC overhead limit exceeded.
> 
>                                     The side input method looks like below.
> 
>                                     private <T> PCollection<KV<String, T>> 
> createSideCollection(String topicName,
>                                       Class<? extends Deserializer<T>> 
> deserializerClass) {
> 
>                                       Map<String, Object> consumerProperties 
> = ImmutableMap.<String, Object>builder()
>                                         
> .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>                                         .put(ConsumerConfig.GROUP_ID_CONFIG, 
> UUID.randomUUID().toString())
>                                         .build();
> 
>                                       PCollection<KV<String, T>> kafkaValues 
> = pipeline.apply("collectionFromTopic-" + topicName,
>                                         KafkaIO.<String, T>read()
>                                           
> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>                                           
> .withTopics(Collections.singletonList(topicName))
>                                           
> .withKeyDeserializer(KeyDeserializer.class)
>                                           
> .withValueDeserializer(deserializerClass)
>                                           
> .withConsumerConfigUpdates(consumerProperties)
>                                           .withoutMetadata());
> 
>                                       Trigger trigger = Repeatedly.forever(
>                                         AfterFirst.of(
>                                           AfterPane.elementCountAtLeast(1),
>                                           
> AfterProcessingTime.pastFirstElementInPane()
>                                         ));
> 
>                                       return 
> kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
>                                         .triggering(trigger)
>                                         .accumulatingFiredPanes()
>                                         
> .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>                                       );
>                                     }
> 
>                                     we run 2 flink task managers each with 4 
> slots. I think the problem lies in
> 
>                                     the sideinput after looking into the heap 
> dump. Also double confirmed disable
> 
>                                     the sideinput, the pipeline runs fine. 
> 
>                                     Can you please provide some help?
> 
>                                     Thanks a lot!
> 
>                                     Eleanore
> 
>                                     image.png
> 

Reply via email to