Hello,

I am running into a strange issue with the KafkaIO streaming source.

The source just keeps reading records from the Kafka topics even before the
downstream DoFns in the pipeline have got a chance to process them. (It
keeps reading new data continuously, quickly resulting in a heap
out-of-memory, regardless of how large the heap is)

I tried to read through the code, and understood that the KafkaIO unbounded
reader reads from Kafka when its "advance()" method gets called (with some
optimization to pre-read a little bit in a separate thread). I couldn't
figure out, though as to who calls this "advance" method and why is that
caller calling it even though the previously read data isn't yet processed.

In my pipeline, I have these:
 1. A simple DoFn that does an outputWithTimestamp to assign timestamps to
records
2. Windowing into FixedWindows
3. A Stateful DoFn that collects incoming records into a BagState. This
does some processing with the records in the BagState and outputs the
records after a certain condition is met.
4. Another windowing function for the output from the stateful DoFn
5. FileIO to write to a remote destination.

I suspect that somewhere in this, probably in that stateful DoFn, Beam
might be advancing the watermark as soon as it has handed over the record
to DoFn that puts it into the BagState. Or, maybe it does not wait for
FileIO to write to the remote destination and advances the watermark.

How do I debug this; or is there a way to inform Beam that I've not yet
fully processed a particular record?

Thank you.

Reply via email to