What you describe is not expected. Here are the relevant points, I think: - A window is expired when the watermark is past the end of the window + allowed lateness - An element is droppable when it is associated to an expired window - All droppable elements should be dropped before reaching the stateful ParDo step - The state and processing time timers for a particular key+window pair are garbage collected when the window is expired, because it is known that nothing can cause that state to be read - A key+window state is not cleared until all event time timers have been dispatched
Do these make sense? I'd love to see more detail of your pipeline code. One thing to note is that an element being behind the watermark doesn't really matter. What matters is how the watermark relates to its window. Kenn On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <[email protected]> wrote: > Hello, > > I'm encountering an unexpected behavior in one of our pipeline, I hope you > might help me make sense of it. > > This is a streaming pipeline implemented with the Java SDK (2.10) and > running on Dataflow. > > * In the pipeline a FixedWindow is applied to the data with an allowed > lateness of a week (doesn't really matter). > * The windowed PCollection is then used in 2 separate branches: one > regular GroupByKey and and a second one which is Transform which makes have > use of State and Timers. > * In the stateful transform, incoming elements are added to a BagState > container until some condition on the elements is reached, and the elements > in the BagState are dispatched downstream. A timer makes sure that the > BagState is dispatched even if the condition is not met after some timeout > has expired. > > Occasionally very late data enters the pipeline, with timestamps older > than the allowed lateness. > In these cases, the GroupByKey transform behaves as expected, and there > aren't any panes with the late data output downstream. > In the stateful transform, on the other hand, I see that these late > elements are processed and added to the BagState, but at a later point in > time (e.g. when the timer is triggered) the elements "disappear" from the > BagState and are no longer observable. This can happen multiple times, i.e. > late elements added to the BagState and then disappearing at a later time. > > Is this an expected behavior? Is there some sort of late data "garbage > collection" which eventually removes late elements? > > Thank you for your help, I hope my description is clear enough. > > Regards, > Amit. >
