Hi Fabian, your proposed solution for: Multiple window aggregations You can construct a data flow of cascading window operators and fork off (to emit or further processing) the result after each window.
Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ... \-> out_1 \-> out_2 \-> out_3 does not work, am I missing something? First I tried the following DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness DataStream<ReadingAggregate> aggregatesPerMinute = values .keyBy("id") .timeWindow(Time.minutes(1)) .allowedLateness(Time.minutes(2)) .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute .keyBy("id") .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(2)) .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates()); but due to late data the first fold function would emit 2 rolling aggregates (one with and one without the late element), which results in being counted twice within the second reducer. Therefore i tried WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness .keyBy("id") .timeWindow(Time.minutes(1)) .allowedLateness(Time.hours(2)); WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(2)); DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); which gives me a compiler error as WindowedStream does not provide a timeWindow method. Finally I settled with this: KeyedStream<Reading, Tuple> readings = input .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness .keyBy("id"); DataStream<ReadingAggregate> aggregatesPerMinute = readings .timeWindow(Time.minutes(1)) .allowedLateness(Time.hours(2)) .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); DataStream<ReadingAggregate> aggregatesPerHour = readings .timeWindow(Time.hours(1)) .allowedLateness(Time.hours(2)) .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); Feedback is very welcome. best, Stephan > On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List > archive.] <ml-node+s2336050n10033...@n4.nabble.com> wrote: > > Hi Stephan, > > I just wrote an answer to your SO question. > > Best, Fabian > > 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email] > <x-msg://3/user/SendEmail.jtp?type=node&node=10033&i=0>>: > Hello, > > I found this question in the Nabble archive > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) > but was unable/dont know how to reply. > > Here is my question regarding the mentioned thread: > >> Hello, >> >> I have similar requirements (see StackOverflor >> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data >> >> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). >> I am pretty new to flink, could you elaborate on a possible solution? We >> can guarantee good ordering by sensor_id, thus watermarking by key would be >> the only reasonable way for us >> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my >> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per >> key? Or maybe using custom state plus a custom trigger? What happens if a >> sensor dies or is being removed completely, how can this be detected as >> watermarks would be ignored for window garbage collection. Or could we >> dynamically schedule a job of each sensor? Which would result in 1000 Jobs. > > > Thanks, > Stephan > > > > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html> > To unsubscribe from Maintaining watermarks per key, instead of per operator > instance, click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.