I recently noticed something about windows: they retain (in state) every 
element that they receive regardless of whether the user provides a fold/reduce 
function. I can tell that such an approach is necessary in order for evictors 
to work, but I'm not sure if there are other reasons.

I'll describe a use case where this approach is not optimal, and then maybe we 
can discuss ways to get around it or possible modifications to Flink. My jobs 
include windows that are wider than the frequency at which we want updates. For 
example, I might have a window that is one day long, but I might want an 
updated value to be emitted from that window within (say) one processing-time 
minute of a new event being assigned to it. I can accomplish that with a 
trigger that has processing-time delay FIRE as well as event-time 
FIRE_AND_PURGE. Next, I want to gather those items into a bigger window: 
perhaps a month or a year wide. My fold function can ensure that multiple 
events from an upstream window overwrite each other so that they are not 
counted multiple times. However, as I mentioned, the wide window's state will 
hold all the events: all the processing-time fires as well as the final event 
from the upstream FIRE_AND_PURGE. That will make the state bigger than it needs 
to be.

With regard to solutions within the bounds of the existing framework, I am 
considering using a regular fold() operation instead of a long window. The fold 
function would be responsible for performing the eviction that the window was 
previously responsible for. I could implement that as a RichFoldFunction with a 
ReducingState. The main difference is that there would be no triggering 
involved (incoming items would immediately result in reduce() emitting a new 
aggregate). I could also possibly implement my own operator. Are there 
other/better options I have not considered?

Is it desirable to improve support for this use case within Flink? I can 
imagine that other people may want to get incremental/ongoing results from 
their windows as data comes in instead of waiting for the watermark to purge 
the window. In general, they might want better control over the window state. 
If so, what would the solution look like? Perhaps we could allow users to 
specify an additional method to the window operator which extracts the identity 
of any new event, and then Flink would ensure that new events overwrite 
existing events within the window state, preventing it from growing 
unnecessarily. Or, perhaps there is a way to do it based on the identity of the 
window that produces the event? Or, more generally, perhaps we could allow user 
provided fold/reduce functions to eagerly reduce the state of the window, 
although that might impact the evictor feature?

Thanks for your thoughts,
Shannon

Reply via email to