Hi Guys,

I'm trying to use the event-time windowing feature. But the windowing does
not work as expected.

What I've been doing is to write my own source which implements the
EventTimeSourceFunction and uses the collectWithTimeStamp  method.
Additionally I'm emitting a watermark after each element.

My job to test this looks like this:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .window(Time.of(5,TimeUnit.SECONDS))
  .sum(0)
  .flatten()
  .print()

env.execute()

The Input are some tuples with TimeStamps set 10 seconds apart:

value: (1,test) timestamp: 1444228980390
value: (2,foo) timestamp: 1444228990390
value: (3,bar) timestamp: 1444229000390

What I'm expecting is that each tuple goes into a separate window.
The actual output is the sum of all tuples, hence all tuples are collected
in the same window.

Thanks in advance!
Alex

Reply via email to