Thanks Kenn for the very helpful reply and explanation :)! That all makes
sense and I see why what I was hoping to happen doesn't really fit with the
computation model of Beam.
Our use case is that we have a pipeline that is designed for unbounded data
(and therefore doesn't suffer from this problem), but that we also want to
ingest some of test-data from a file. Since I'd like the same windowing,
aggregation etc. that is applied to the incoming unbounded data, we'd like
to use the same pipeline for it, but reading from a text file instead of
KafkaIO. The pipeline I showed in my previous email was actually just the
simplest one I could make that exhibited the problem and ours does a fair
bit more stuff.
I'm doing nothing special with TextIO:
TextIO.read().withCompressionType(TextIO.CompressionType.ZIP).from(file.getAbsolutePath()).
We can do without the tiny windows (if the aggregators stay in memory it
ought to be fine for the amount of data in question) and also tolerate
duplicates, so I thought if I added this, I might get somewhere:
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterWatermark.pastEndOfWindow(),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))))
.discardingFiredPanes()
i.e. if we can just fire at some regular interval (I also tried using
AfterPane), and then discard we may not GC the aggregators until the end
but at least the elements? In the Flink dashboard though I can see that,
even though the 'Records Sent' figure is climbing, 'Bytes sent' remains at
0 and nothing happens downstream. Anyway it seems like a bit of a hack
anyway...
Any idea how I can have a source that, for test purposes, ensures
processing in-order and sets a watermark or how I can otherwise ingest my
test data? Worst case I can split the file into multiple files and run them
through separate pipelines...
Thanks,
Johannes
On Fri, Dec 15, 2017 at 9:19 PM, Kenneth Knowles <[email protected]> wrote:
> How are you invoking TextIO? When it is producing a bounded output, such
> as just reading a static set of files, the watermark will just stay at -inf
> until all the data is read, and then jump to +inf, which would cause just
> the problem you are seeing if the accumulated state is held in memory.
> (Caveat: I'm not a Flink or FlinkRunner expert here)
>
> One reason for this behavior is that this reflects the likelihood that the
> way bounded data is split up would make it very out-of-order and any
> watermark pretty meaningless. A corollary is that all the windows actually
> do need to be kept around until the end; it isn't just a curiosity. Using a
> tiny window size makes it worse, since it is one accumulator per window
> that must be retained.
>
> This is also partly because you are doing a global combine. With a per-key
> combine, shuffled data often comes in key order (again, depends on details
> I'm not sure of for Flink) and it is the keys that can be GCed as you go
> along. It would probably be smart for a global combine over bounded data
> and non-merging windows to shuffle by window and enable this form of GC. I
> don't think any Beam runner does so.
>
> Kenn
>
> On Fri, Dec 15, 2017 at 10:30 AM, Johannes Lehmann <
> [email protected]> wrote:
>
>> Hey,
>>
>> I have banged my head against this for a little while now and I was
>> hoping someone could point me in the right direction :):
>>
>> We are reading time-series data from a text file (TextIO), Windowing,
>> aggregating it using a custom CombineFn and writing the result to MongoDB.
>> The runner is Flink.
>>
>> All of this works in principle, but for a large file, the memory gets
>> filled up even if we are using a tiny window. For all I can tell elements
>> that are read never get released / GCed when there's windowing involved. A
>> simplified section of the pipeline that exhibits the problem looks like
>> this:
>>
>> collection
>> .apply(Window
>> .into(FixedWindows.of(Duration
>> .standardSeconds(1))))
>> .apply(Combine.globally(new MyCombineFn<>()).withoutDefaul
>> ts())
>>
>> Default trigger, nothing fancy.
>>
>> I found it suspicious that Flink never records a watermark (as seen in
>> the UI) for data read from TextIO. Could that have something to do with it
>> - it doesn't have a processing time and therefore cannot make an assumption
>> that there will definitely not be any more data from that window? If so how
>> can I fix this?
>>
>> Otherwise (I realise I haven't shared much code here, but can share
>> whatever may help) any other idea what might cause the memory to not be
>> released?
>>
>> Many thanks,
>> Johannes
>>
>>
>>
>
--
*Johannes Lehmann*
Managing Director, Software Engineer
symbiotech
+44 <+447597569842>759 756 9842 <+44+759+756+9842>, +61 411 705 067
<+61411705067>
symbiotech.com.au
Connect with me on LinkedIn at linkedin.com/in/lehmannjohannes