Hey,
I have banged my head against this for a little while now and I was hoping
someone could point me in the right direction :):
We are reading time-series data from a text file (TextIO), Windowing,
aggregating it using a custom CombineFn and writing the result to MongoDB.
The runner is Flink.
All of this works in principle, but for a large file, the memory gets
filled up even if we are using a tiny window. For all I can tell elements
that are read never get released / GCed when there's windowing involved. A
simplified section of the pipeline that exhibits the problem looks like
this:
collection
.apply(Window
.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Combine.globally(new
MyCombineFn<>()).withoutDefaults())
Default trigger, nothing fancy.
I found it suspicious that Flink never records a watermark (as seen in the
UI) for data read from TextIO. Could that have something to do with it - it
doesn't have a processing time and therefore cannot make an assumption that
there will definitely not be any more data from that window? If so how can
I fix this?
Otherwise (I realise I haven't shared much code here, but can share
whatever may help) any other idea what might cause the memory to not be
released?
Many thanks,
Johannes