Hi, yes, once this PR is merged https://github.com/apache/flink/pull/1238 you can switch between time characteristics and also use the aggregations functions such as sum(...). I'm hoping to merge this by tonight. The tests are still running right now. :D
Cheers, Aljoscha On Wed, 7 Oct 2015 at 17:45 Alexander Kolb <alexander.k...@mni.fh-giessen.de> wrote: > Thanks! > > This works with the exception that I have to use the reduceWindow() method > when summing up my the content of the window. > There still seems to be some work to do. > > With the finished Api will I be able to switch from event-time to > processing- or ingestion-time without having to adjust my code? > > Best, > Alex > > Aljoscha Krettek <aljos...@apache.org> schrieb am Mi., 7. Okt. 2015, > 17:23: > >> Hi, >> right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have >> the old windowing API in there alongside the new one. To make your example >> use the new API that actually uses the timestamps and watermarks you would >> use the following code: >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> val stream = env.addSource(SourceWithEventTime) >> >> stream >> .timeWindowAll(Time.of(5,TimeUnit.SECONDS)) >> // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) >> .sum(0) >> .print() >> >> the version for keyed streams would be: >> stream >> .keyBy(...) >> .timeWindow(Time.of(5,TimeUnit.SECONDS)) >> // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) >> .sum(0) >> .print() >> >> I hope this helps. :D >> >> Cheers, >> Aljoscha >> >> >> On Wed, 7 Oct 2015 at 16:54 Alexander Kolb < >> alexander.k...@mni.fh-giessen.de> wrote: >> >>> Hi Guys, >>> >>> I'm trying to use the event-time windowing feature. But the windowing >>> does not work as expected. >>> >>> What I've been doing is to write my own source which implements the >>> EventTimeSourceFunction and uses the collectWithTimeStamp method. >>> Additionally I'm emitting a watermark after each element. >>> >>> My job to test this looks like this: >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> val stream = env.addSource(SourceWithEventTime) >>> >>> stream >>> .window(Time.of(5,TimeUnit.SECONDS)) >>> .sum(0) >>> .flatten() >>> .print() >>> >>> env.execute() >>> >>> The Input are some tuples with TimeStamps set 10 seconds apart: >>> >>> value: (1,test) timestamp: 1444228980390 >>> value: (2,foo) timestamp: 1444228990390 >>> value: (3,bar) timestamp: 1444229000390 >>> >>> What I'm expecting is that each tuple goes into a separate window. >>> The actual output is the sum of all tuples, hence all tuples are >>> collected in the same window. >>> >>> Thanks in advance! >>> Alex >>> >>> >>>