Hi,
why did you settle for the last solution?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 15:57 kaelumania <stephan.epp...@zweitag.de> wrote:

> Hi Fabian,
>
> your proposed solution for:
>
>
>    1. Multiple window aggregations
>
> You can construct a data flow of cascading window operators and fork off
> (to emit or further processing) the result after each window.
>
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>                         \-> out_1        \-> out_2         \-> out_3
>
> does not work, am I missing something?
>
> First I tried the following
>
> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new 
> StrictWatermarkAssigner()); // force lateness
>
> DataStream<ReadingAggregate> aggregatesPerMinute = values
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.minutes(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>         .keyBy("id")
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new AggregateReadingAggregates(), new 
> AggregateReadingAggregates());
>
> but due to late data the first fold function would emit 2 rolling
> aggregates (one with and one without the late element), which results in
> being counted twice within the second reducer. Therefore i tried
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2));
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = 
> readingsPerMinute
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2));
>
> DataStream<ReadingAggregate> aggregatesPerMinute = 
> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new 
> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
>         .keyBy("id");
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = readings
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a
> href="x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to