Hi, Can't you use a second keyed window (with the same size) and apply .max(...)?
-Matthias On 11/23/2015 11:00 AM, Konstantin Knauf wrote: > Hi Fabian, > > thanks for your answer. Yes, that's what I want. > > The solution you suggest is what I am doing right now (see last of the > bullet point in my question). > > But given your example. I would expect the following output: > > (key: 1, w-time: 10, agg: 17) > (key: 2, w-time: 10, agg: 20) > (key: 1, w-time: 20, agg: 30) > (key: 1, w-time: 20, agg: 30) > (key: 1, w-time: 20, agg: 30) > > Because the reduce function is evaluated for every incoming event (i.e. > each key), right? > > Cheers, > > Konstantin > > On 23.11.2015 10:47, Fabian Hueske wrote: >> Hi Konstantin, >> >> let me first summarize to make sure I understood what you are looking for. >> You computed an aggregate over a keyed event-time window and you are >> looking for the maximum aggregate for each group of windows over the >> same period of time. >> So if you have >> (key: 1, w-time: 10, agg: 17) >> (key: 2, w-time: 10, agg: 20) >> (key: 1, w-time: 20, agg: 30) >> (key: 2, w-time: 20, agg: 28) >> (key: 3, w-time: 20, agg: 5) >> >> you would like to get: >> (key: 2, w-time: 10, agg: 20) >> (key: 1, w-time: 20, agg: 30) >> >> If this is correct, you can do this as follows. >> You can extract the window start and end time from the TimeWindow >> parameter of the WindowFunction and key the stream either by start or >> end time and apply a ReduceFunction on the keyed stream. >> >> Best, Fabian >> >> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <konstantin.kn...@tngtech.com >> <mailto:konstantin.kn...@tngtech.com>>: >> >> Hi everyone, >> >> me again :) Let's say you have a stream, and for every window and key >> you compute some aggregate value, like this: >> >> DataStream.keyBy(..) >> .timeWindow(..) >> .apply(...) >> >> >> Now I want to get the maximum aggregate value for every window over the >> keys. This feels like a pretty natural use case. How can I achieve this >> with Flink in the most compact way? >> >> The options I thought of so far are: >> >> * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction >> would not be distributed by keys anymore. >> >> * use a windowAll after the WindowFunction to create windows of the >> aggregates, which originated from the same timeWindow. This could be >> done either with a TimeWindow or with a GlobalWindow with DeltaTrigger. >> Drawback: Seems unnecessarily complicated and doubles the latency (at >> least in my naive implementation ;)). >> >> * Of course, you could also just keyBy the start time of the window >> after the WindowFunction, but then you get more than one event for each >> window. >> >> Is there some easy way I am missing? If not, is there a technical >> reasons, why such an "reduceByKeyAndWindow"-operator is not available in >> Flink? >> >> Cheers, >> >> Konstantin >> >> >
signature.asc
Description: OpenPGP digital signature