Thanks! Yes, the SlidingEventTimeWindow works, but is there any way to pre-aggregate things with tumbling windows that emit events more often than their window size? Perhaps I can do this before I merge the streams? (But if ContinuousEventTimeTrigger is the only way to do that, it's bad if it doesn't clean up its state).
I assume using sliding window states will be too large and less efficient than tumbling windows as a sliding fold needs to keep all events in the window and recompute the fold as events slide out of the window, while a tumbling fold just needs to keep the aggregation and can discard events as it folds them. I am reviewing how one would replace a batch solution based on 3 bucketed aggregations of different window sizes and it seems tumbling windows would be a perfect fit and would need to keep only the 3 aggregations a memory, while sliding windows would need to keep up to 3 copies of all events (for at least the smallest window size) to compute the same type of results. Hälsningar! William ----- Original Message ----- From: user@flink.apache.org To:<user@flink.apache.org> Cc: Sent:Mon, 21 Nov 2016 08:22:16 +0000 Subject:Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams? Hi William, I am wondering whether the ContinuousEventTimeTrigger is the best choice here (it never cleans up the state as far as I know). Have you tried the simple SlidingEventTimeWindows as your window function? Cheers, Gyula William Saar <will...@saar.se [1]> ezt írta (időpont: 2016. nov. 19., Szo, 18:28): Hi! My topology below seems to work when I comment out all the lines with ContinuousEventTimeTrigger, but prints nothing when the line is in there. Can I coGroup two large time windows that use a different trigger time than the window size? (even if the ContinuousEventTimeTrigger doesn't work for coGroups, I would not expect the result to be completely silent). The streams I'm cogroupng are from 2 different Kafka sources and uses event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps DataStream<CommonType> stream1 = <stream of event type1> .window(TumblingEventTimeWindows.of(Time.seconds(30))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) .fold(...); DataStream<CommonType> stream2 = <stream of event type2> .window(TumblingEventTimeWindows.of(Time.seconds(30))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) .fold(...); stream1.coGroup(stream2).where(...).equalTo(...) .window(TumblingEventTimeWindows.of(Time.seconds(30))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) .print() Thanks, William Links: ------ [1] mailto:will...@saar.se