Hi, if you are using ingestion-time (or event-time) as your stream time characteristic, i.e.:
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or TimeCharacteristic.EventTime you can apply several window transforms after another and they will apply the same "time window" because they work on the element timestamps. What you can then do is have a window that does the aggregation and then another one (that has to be global) to select the top elements: result = input .keyBy(<some key>) .timeWindow(Time.minutes(1), Time.seconds(5)) .sum(2) .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding window here, because the above will output a new window every 5 seconds .apply(<my custom window function>) I hope this helps. Cheers, Aljoscha On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan < balaji.rajagopa...@olacabs.com> wrote: > I had a similar use case and ended writing the aggregation logic in the > apply function, could not find any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala <kana...@hotmail.com> > wrote: > >> Hi, >> >> I would like to write something that does something like a word count, >> and then emits only the 10 highest counts for that window. Logically, I >> would want to do something like: >> >> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, >> TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) >> >> However, the window context is lost after I do the sum aggregation. Is >> there a straightforward way to express this logic in Flink 1.0? One way I >> can think of is to have a complex function in apply() that has state, but I >> would like to know if there is something a little cleaner than that. >> >> Thanks, >> Kanak > > >