Hi Fabian,

your proposed solution for:
 
Multiple window aggregations
You can construct a data flow of cascading window operators and fork off (to 
emit or further processing) the result after each window.

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
                        \-> out_1        \-> out_2         \-> out_3
does not work, am I missing something?

First I tried the following
DataStream<Reading> values = input.assignTimestampsAndWatermarks(new 
StrictWatermarkAssigner()); // force lateness

DataStream<ReadingAggregate> aggregatesPerMinute = values
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.minutes(2))
        .apply(new ReadingAggregate(), new AggregateReadings(), new 
AggregateReadings());

DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
        .keyBy("id")
        .timeWindow(Time.hours(1))
        .allowedLateness(Time.hours(2))
        .apply(new AggregateReadingAggregates(), new 
AggregateReadingAggregates());
but due to late data the first fold function would emit 2 rolling aggregates 
(one with and one without the late element), which results in being counted 
twice within the second reducer. Therefore i tried
WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
        .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force 
lateness
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.hours(2));

WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
        .timeWindow(Time.hours(1))
        .allowedLateness(Time.hours(2));

DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new 
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new 
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
which gives me a compiler error as WindowedStream does not provide a timeWindow 
method.

Finally I settled with this:
KeyedStream<Reading, Tuple> readings = input
        .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force 
lateness
        .keyBy("id");

DataStream<ReadingAggregate> aggregatesPerMinute = readings
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.hours(2))
        .apply(new ReadingAggregate(), new AggregateReadings(), new 
AggregateReadings());

DataStream<ReadingAggregate> aggregatesPerHour = readings
        .timeWindow(Time.hours(1))
        .allowedLateness(Time.hours(2))
        .apply(new ReadingAggregate(), new AggregateReadings(), new 
AggregateReadings());


Feedback is very welcome.

best, Stephan



> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List 
> archive.] <ml-node+s2336050n10033...@n4.nabble.com> wrote:
> 
> Hi Stephan,
> 
> I just wrote an answer to your SO question. 
> 
> Best, Fabian
> 
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email] 
> <x-msg://3/user/SendEmail.jtp?type=node&node=10033&i=0>>:
> Hello,
> 
> I found this question in the Nabble archive 
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>  but was unable/dont know how to reply.
> 
> Here is my question regarding the mentioned thread:
> 
>> Hello, 
>> 
>> I have similar requirements (see StackOverflor 
>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>  
>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>> can guarantee good ordering by sensor_id, thus watermarking by key would be 
>> the only reasonable way for us 
>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per 
>> key? Or maybe using custom state plus a custom trigger? What happens if a 
>> sensor dies or is being removed completely, how can this be detected as 
>> watermarks would be ignored for window garbage collection. Or could we 
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
> 
> 
> Thanks,
> Stephan
> 
> 
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator 
> instance, click here 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to