Re: Event time in Flink streaming

2015-09-08 Thread Gyula Fóra
This is actually simpler than you think, you can just use the Time.of(...) helper: ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long startTime))... Gyula Martin Neumann ezt írta (időpont: 2015. szept. 8., K, 20:20): > Hej, > > I want to give TimeTriggerPolicy a try and

Re: Event time in Flink streaming

2015-09-08 Thread Martin Neumann
Hej, I want to give TimeTriggerPolicy a try and see how much of a problem it will be in this use case. Is there any example on how to use it? I looked at the API descriptions but I'm confused now. cheers Martin On Fri, Aug 28, 2015 at 5:35 PM, Martin Neumann wrote: > The stream consists of log

Re: Event time in Flink streaming

2015-08-28 Thread Martin Neumann
The stream consists of logs from different machines with synchronized clocks. As a result timestamps are not strictly increasing but there is a bound on how much out of order they can be. (One aim is to detect events go out of order more then a certain amount indication some problem in the system s

Re: Event time in Flink streaming

2015-08-28 Thread Aljoscha Krettek
Hi Martin, the answer depends, because the current windowing implementation has some problems. We are working on improving it in the 0.10 release, though. If your elements arrive with strictly increasing timestamps and you have parallelism=1 or don't perform any re-partitioning of data (which a gr

Re: Event time in Flink streaming

2015-08-28 Thread Matthias J. Sax
Hi Martin, you need to implement you own policy. However, this should be be complicated. Have a look at "TimeTriggerPolicy". You just need to provide a "Timestamp" implementation that extracts you ts-attribute from the tuples. -Matthias On 08/28/2015 03:58 PM, Martin Neumann wrote: > Hej, > > I

Event time in Flink streaming

2015-08-28 Thread Martin Neumann
Hej, I have a stream of timestamped events I want to process in Flink streaming. Di I have to write my own policies to do so, or can define time based windows to use the timestamps instead of the system time? cheers Martin