Hi,

Can't you use a second keyed window (with the same size) and apply
.max(...)?

-Matthias

On 11/23/2015 11:00 AM, Konstantin Knauf wrote:
> Hi Fabian,
> 
> thanks for your answer. Yes, that's what I want.
> 
> The solution you suggest is what I am doing right now (see last of the
> bullet point in my question).
> 
> But given your example. I would expect the following output:
> 
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> 
> Because the reduce function is evaluated for every incoming event (i.e.
> each key), right?
> 
> Cheers,
> 
> Konstantin
> 
> On 23.11.2015 10:47, Fabian Hueske wrote:
>> Hi Konstantin,
>>
>> let me first summarize to make sure I understood what you are looking for.
>> You computed an aggregate over a keyed event-time window and you are
>> looking for the maximum aggregate for each group of windows over the
>> same period of time.
>> So if you have
>> (key: 1, w-time: 10, agg: 17)
>> (key: 2, w-time: 10, agg: 20)
>> (key: 1, w-time: 20, agg: 30)
>> (key: 2, w-time: 20, agg: 28)
>> (key: 3, w-time: 20, agg: 5)
>>
>> you would like to get:
>> (key: 2, w-time: 10, agg: 20)
>> (key: 1, w-time: 20, agg: 30)
>>
>> If this is correct, you can do this as follows.
>> You can extract the window start and end time from the TimeWindow
>> parameter of the WindowFunction and key the stream either by start or
>> end time and apply a ReduceFunction on the keyed stream.
>>
>> Best, Fabian
>>
>> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <konstantin.kn...@tngtech.com
>> <mailto:konstantin.kn...@tngtech.com>>:
>>
>>     Hi everyone,
>>
>>     me again :) Let's say you have a stream, and for every window and key
>>     you compute some aggregate value, like this:
>>
>>     DataStream.keyBy(..)
>>               .timeWindow(..)
>>               .apply(...)
>>
>>
>>     Now I want to get the maximum aggregate value for every window over the
>>     keys. This feels like a pretty natural use case. How can I achieve this
>>     with Flink in the most compact way?
>>
>>     The options I thought of so far are:
>>
>>     * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>>     would not be distributed by keys anymore.
>>
>>     * use a windowAll after the WindowFunction to create windows of the
>>     aggregates, which originated from the same timeWindow. This could be
>>     done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>>     Drawback: Seems unnecessarily complicated and doubles the latency (at
>>     least in my naive implementation ;)).
>>
>>     * Of course, you could also just keyBy the start time of the window
>>     after the WindowFunction, but then you get more than one event for each
>>     window.
>>
>>     Is there some easy way I am missing? If not, is there a technical
>>     reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>>     Flink?
>>
>>     Cheers,
>>
>>     Konstantin
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to