Hi,
yes, once this PR is merged https://github.com/apache/flink/pull/1238 you
can switch between time characteristics and also use the aggregations
functions such as sum(...). I'm hoping to merge this by tonight. The tests
are still running right now. :D

Cheers,
Aljoscha

On Wed, 7 Oct 2015 at 17:45 Alexander Kolb <alexander.k...@mni.fh-giessen.de>
wrote:

> Thanks!
>
> This works with the exception that I have to use the reduceWindow() method
> when summing up my the content of the window.
> There still seems to be some work to do.
>
> With the finished Api will I be able to switch from event-time to
> processing- or ingestion-time without having to adjust my code?
>
> Best,
> Alex
>
> Aljoscha Krettek <aljos...@apache.org> schrieb am Mi., 7. Okt. 2015,
> 17:23:
>
>> Hi,
>> right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have
>> the old windowing API in there alongside the new one. To make your example
>> use the new API that actually uses the timestamps and watermarks you would
>> use the following code:
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> val stream = env.addSource(SourceWithEventTime)
>>
>> stream
>>   .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
>>   // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
>>   .sum(0)
>>   .print()
>>
>> the version for keyed streams would be:
>> stream
>>   .keyBy(...)
>>   .timeWindow(Time.of(5,TimeUnit.SECONDS))
>>   // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
>>   .sum(0)
>>   .print()
>>
>> I hope this helps. :D
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <
>> alexander.k...@mni.fh-giessen.de> wrote:
>>
>>> Hi Guys,
>>>
>>> I'm trying to use the event-time windowing feature. But the windowing
>>> does not work as expected.
>>>
>>> What I've been doing is to write my own source which implements the
>>> EventTimeSourceFunction and uses the collectWithTimeStamp  method.
>>> Additionally I'm emitting a watermark after each element.
>>>
>>> My job to test this looks like this:
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val stream = env.addSource(SourceWithEventTime)
>>>
>>> stream
>>>   .window(Time.of(5,TimeUnit.SECONDS))
>>>   .sum(0)
>>>   .flatten()
>>>   .print()
>>>
>>> env.execute()
>>>
>>> The Input are some tuples with TimeStamps set 10 seconds apart:
>>>
>>> value: (1,test) timestamp: 1444228980390
>>> value: (2,foo) timestamp: 1444228990390
>>> value: (3,bar) timestamp: 1444229000390
>>>
>>> What I'm expecting is that each tuple goes into a separate window.
>>> The actual output is the sum of all tuples, hence all tuples are
>>> collected in the same window.
>>>
>>> Thanks in advance!
>>> Alex
>>>
>>>
>>>

Reply via email to