How are you invoking TextIO? When it is producing a bounded output, such as
just reading a static set of files, the watermark will just stay at -inf
until all the data is read, and then jump to +inf, which would cause just
the problem you are seeing if the accumulated state is held in memory.
(Caveat: I'm not a Flink or FlinkRunner expert here)

One reason for this behavior is that this reflects the likelihood that the
way bounded data is split up would make it very out-of-order and any
watermark pretty meaningless. A corollary is that all the windows actually
do need to be kept around until the end; it isn't just a curiosity. Using a
tiny window size makes it worse, since it is one accumulator per window
that must be retained.

This is also partly because you are doing a global combine. With a per-key
combine, shuffled data often comes in key order (again, depends on details
I'm not sure of for Flink) and it is the keys that can be GCed as you go
along. It would probably be smart for a global combine over bounded data
and non-merging windows to shuffle by window and enable this form of GC. I
don't think any Beam runner does so.

Kenn

On Fri, Dec 15, 2017 at 10:30 AM, Johannes Lehmann <
[email protected]> wrote:

> Hey,
>
> I have banged my head against this for a little while now and I was hoping
> someone could point me in the right direction :):
>
> We are reading time-series data from a text file (TextIO), Windowing,
> aggregating it using a custom CombineFn and writing the result to MongoDB.
> The runner is Flink.
>
> All of this works in principle, but for a large file, the memory gets
> filled up even if we are using a tiny window. For all I can tell elements
> that are read never get released / GCed when there's windowing involved. A
> simplified section of the pipeline that exhibits the problem looks like
> this:
>
> collection
>                 .apply(Window
>                         .into(FixedWindows.of(
> Duration.standardSeconds(1))))
>                 .apply(Combine.globally(new MyCombineFn<>()).
> withoutDefaults())
>
> Default trigger, nothing fancy.
>
> I found it suspicious that Flink never records a watermark (as seen in the
> UI) for data read from TextIO. Could that have something to do with it - it
> doesn't have a processing time and therefore cannot make an assumption that
> there will definitely not be any more data from that window? If so how can
> I fix this?
>
> Otherwise (I realise I haven't shared much code here, but can share
> whatever may help) any other idea what might cause the memory to not be
> released?
>
> Many thanks,
> Johannes
>
>
>

Reply via email to