Hi,
if you are using ingestion-time (or event-time) as your stream time
characteristic, i.e.:

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or
TimeCharacteristic.EventTime

you can apply several window transforms after another and they will apply
the same "time window" because they work on the element timestamps. What
you can then do is have a window that does the aggregation and then another
one (that has to be global) to select the top elements:

result = input
  .keyBy(<some key>)
  .timeWindow(Time.minutes(1), Time.seconds(5))
  .sum(2)
  .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding window
here, because the above will output a new window every 5 seconds
  .apply(<my custom window function>)

I hope this helps.

Cheers,
Aljoscha

On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> I had a similar use case and ended writing the aggregation logic in the
> apply function, could not find any better solution.
>
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala <kana...@hotmail.com>
> wrote:
>
>> Hi,
>>
>> I would like to write something that does something like a word count,
>> and then emits only the 10 highest counts for that window. Logically, I
>> would want to do something like:
>>
>> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
>> TimeUnit.SECONDS)).sum(2).apply(getTopK(10))
>>
>> However, the window context is lost after I do the sum aggregation. Is
>> there a straightforward way to express this logic in Flink 1.0? One way I
>> can think of is to have a complex function in apply() that has state, but I
>> would like to know if there is something a little cleaner than that.
>>
>> Thanks,
>> Kanak
>
>
>

Reply via email to