Hi Eleanore,

Thanks for sharing your findings with us. :-)

Cheers, Fabian

Am Do., 7. Mai 2020 um 04:56 Uhr schrieb Eleanore Jin <
[email protected]>:

> Hi Fabian,
>
> I just got confirmation from Apache Beam community, Beam will buffer the
> data until there is data from broadcast stream.
>
> Thanks!
> Eleanore
>
> On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <[email protected]> wrote:
>
>> Hi Eleanore,
>>
>> The "GC overhead limit exceeded" error shows that the JVM spends way too
>> much time garbage collecting and only recovers little memory with every run.
>> Since, the program doesn't make any progress in such a situation it is
>> terminated with the GC Overhead Error. This typically happens when lots of
>> temporary objects are created.
>> The root cause could be Flink, Beam, or your own code.
>> It's important to understand that this error is not directly related to a
>> shortage of memory (although more memory can help to mitigate the issue a
>> bit) but rather indicates an implementation issue.
>>
>> Coming back to your question, Flink's Broadcast stream does *not* block
>> or collect events from the non-broadcasted side if the broadcast side
>> doesn't serve events.
>> However, the user-implemented operators (Beam or your code in this case)
>> often puts non-broadcasted events into state to wait for input from the
>> other side.
>> Since the error is not about lack of memory, the buffering in Flink state
>> might not be the problem here.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
>> [email protected]>:
>>
>>> Hi All,
>>>
>>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>>> filter of the data from main stream.
>>>
>>> I have experienced OOM: GC overhead limit exceeded continuously.
>>>
>>> After did some experiments, I observed following behaviour:
>>> 1. run job without side input(broadcast stream): no OOM issue
>>> 2. run job with side input (kafka topic with 1 partition) with data
>>> available from this side input: no OOM issue
>>> 3. run job with side input (kafka topic with 1 partition) without any
>>> data from the side input: *OOM issue*
>>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>>> looks like due to the references hold by Broadcast stream
>>> [image: image.png]
>>>
>>> My question is: what is the behaviour from Broadcast stream if there is
>>> no data available? Does it cache the data from main stream and wait until
>>> data becoming available from Broadcast stream to process?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>

回复