Hi Martin,
I've seen similar things. The Direct Runner is intended for testing with
small datasets, and is expected to retain the entire dataset in memory. It
sounds like you have a pipeline that requires storing data for a GroupByKey
operation. There is no mechanism to page intermediates to disk in the
Direct Runner.
You might want to try the Flink local runner, which should handle this case
better.
Andrew
On Sun, Oct 21, 2018 at 3:43 PM Martin Procházka
wrote:
> Hello,
> I have got an application, which utilizes Beam pipeline - Direct Runner.
> It contains an unbounded source. I have got a frontend, which manually adds
> some data into the pipeline with the same timestamp in order to be
> processed in the same window.
>
> The pipeline runs well, however it eventually runs out of heap space. I
> have profiled the application and have noticed that there is a hotspot in
> outputWatermark - holds - keyedHolds. It gets swamped mainly by values
> keyed by the anonymous StructuralKey 'empty' classes over time. With every
> request it grows and never gets released.
>
> When I changed the empty structural key to true singleton, it solved a
> part of this issue, but I have noticed that there is a specific test that
> ensures that two empty keys (StructuralKey) are not equal so my change
> would not be valid. When are those empty keys used and when should they be
> removed in the Direct runner? Is there some mechanism to prevent the
> inevitable heap out of memory error after few requests?
>
> Regards,
> Martin Prochazka
>
>
>
>
>