Hi, I think conceptually the pipeline could look something like this: env .addSource(...) .keyBy("device_id") .window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(10))) .trigger(new Trigger { def onElement(el, timestamp, window, ctx) = { if (window.start == TimeWindow.getWindowStartWithOffset(timestamp, 0, 10_000)) { ctx.registerEventTimeTimer(window.end) } TriggerResult.CONTINUE } def onEventTime(time, window, ctx) = { TriggerResult.FIRE } })) .aggregate(...)
(slide 10s needs to be adjusted) Regards, Roman On Tue, Feb 25, 2020 at 3:44 PM Avinash Tripathy < avinash.tripa...@stellapps.com> wrote: > Hi Theo, > > We also have the same scenario. If it would be great if you could provide > some examples or more details about flink process function. > > Thanks, > Avinash > > On Tue, Feb 25, 2020 at 12:29 PM theo.diefent...@scoop-software.de < > theo.diefent...@scoop-software.de> wrote: > >> Hi, >> >> At last flink forward in Berlin I spoke with some persons about the same >> problem, where they had construction devices as IoT sensors which could >> even be offline for multiple days. >> >> They told me that the major problem for them was that the watermark in >> Flink is maintained per operator /subtask, even if you group by key. >> >> They solved their problem via a Flink process function where they have >> full control over state and timers, so you can deal with each device as you >> like and can e. g. maintain something similar to a per device watermark. I >> also think that it is the best way to go for this usecase. >> >> Best regards >> Theo >> >> >> >> >> -------- Ursprüngliche Nachricht -------- >> Von: hemant singh <hemant2...@gmail.com> >> Datum: Di., 25. Feb. 2020, 06:19 >> An: Marco Villalobos <mvillalo...@beyond.ai> >> Cc: user@flink.apache.org >> Betreff: Re: Timeseries aggregation with many IoT devices off of one >> Kafka topic. >> >> Hello, >> >> I am also working on something similar. Below is the pipeline design I >> have, sharing may be it can be helpful. >> >> topic -> keyed stream on device-id -> window operation -> sink. >> >> You can PM me on further details. >> >> Thanks, >> Hemant >> >> On Tue, Feb 25, 2020 at 1:54 AM Marco Villalobos <mvillalo...@beyond.ai> >> wrote: >> >> I need to collect timeseries data from thousands of IoT devices. Each >> device has name, value, and timestamp published to one Kafka topic. The >> event time timestamps are in order only relation with the individual >> device, but out of order with respect to other devices. >> >> >> >> Is there a way to aggregate a 15 minute window of this in which each IoT >> devices gets aggregated with its own event time? >> >> >> >> I would deeply appreciate if somebody could guide me to an approach for >> solving this in Flink. >> >> >> >> I wish there was a group chat for these type of problems. >> >> >> >>