Hello, I am running into a strange issue with the KafkaIO streaming source.
The source just keeps reading records from the Kafka topics even before the downstream DoFns in the pipeline have got a chance to process them. (It keeps reading new data continuously, quickly resulting in a heap out-of-memory, regardless of how large the heap is) I tried to read through the code, and understood that the KafkaIO unbounded reader reads from Kafka when its "advance()" method gets called (with some optimization to pre-read a little bit in a separate thread). I couldn't figure out, though as to who calls this "advance" method and why is that caller calling it even though the previously read data isn't yet processed. In my pipeline, I have these: 1. A simple DoFn that does an outputWithTimestamp to assign timestamps to records 2. Windowing into FixedWindows 3. A Stateful DoFn that collects incoming records into a BagState. This does some processing with the records in the BagState and outputs the records after a certain condition is met. 4. Another windowing function for the output from the stateful DoFn 5. FileIO to write to a remote destination. I suspect that somewhere in this, probably in that stateful DoFn, Beam might be advancing the watermark as soon as it has handed over the record to DoFn that puts it into the BagState. Or, maybe it does not wait for FileIO to write to the remote destination and advances the watermark. How do I debug this; or is there a way to inform Beam that I've not yet fully processed a particular record? Thank you.
