Thanks Timo. Basically my requirement is based on event time the window has to 
be created but the trigger has to happen either when it has received the next 
element >10s or 10s has passed. Exactly the same way as you described. Let me 
try the AssignerWithPeriodicWatermarks approach. 

Thanks,
Govind

> On Aug 2, 2017, at 7:46 AM, Timo Walther <[email protected]> wrote:
> 
> I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could solve 
> your problem easily.
> 
> Timo
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html#with-periodic-watermarks
> 
>> Am 02.08.17 um 16:30 schrieb Timo Walther:
>> The question is what defines your `10 seconds`. In event-time the incoming 
>> events determine when 10 seconds have passed. Your description sounds like 
>> you want to have results after 10 seconds wall-clock/processing-time. So 
>> either you use a processing-time window or you implement a custom trigger 
>> that triggers both on event-time or on a timer that you have set after 10 s 
>> processing-time.
>> 
>> Timo
>> 
>> 
>>> Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
>>> Thanks Timo. The next message will arrive only after a minute or so. Is 
>>> there a way to evict whatever is there in window buffer just after 10 
>>> seconds irrespective of whether a new message arrives or not. 
>>> 
>>> Thanks,
>>> Govind
>>> 
>>> On Aug 2, 2017, at 6:56 AM, Timo Walther <[email protected]> wrote:
>>> 
>>>> Hi Govind,
>>>> 
>>>> if the window is not triggered, this usually indicates that your timestamp 
>>>> and watermark assignment is not correct. According to your description, I 
>>>> don't think that you need a custom trigger/evictor. How often do events 
>>>> arrive from one device? There must be another event from the same device 
>>>> that has a timestamp >10s in order to trigger the window evaluation.
>>>> 
>>>> Instead of using the Kafka timestamp, maybe you could also convert your 
>>>> timestamps to UTC in the TimestampExtractor.
>>>> 
>>>> There are no official limitation. However, each window comes with some 
>>>> overhead. So you should choose your memory/state backends and parallelism 
>>>> accordingly.
>>>> 
>>>> Hope that helps.
>>>> 
>>>> Timo
>>>> 
>>>> 
>>>>> Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
>>>>> Hi,
>>>>> 
>>>>> I have few questions regarding event time windowing. My scenario is 
>>>>> devices from various timezones will send messages with timestamp and I 
>>>>> need to create a window per device for 10 seconds. The messages will 
>>>>> mostly arrive in order.
>>>>> 
>>>>> Here is my sample code to perform windowing and aggregating the messages 
>>>>> after the window to further process it.
>>>>> 
>>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
>>>>>                     new DeserializationSchema(),
>>>>>                     kafkaConsumerProperties);
>>>>> 
>>>>> DataStream<Message> msgStream = streamEnv
>>>>>                                   .addSource(consumer)
>>>>>                                   .assignTimestampsAndWatermarks(new 
>>>>> TimestampExtractor(Time.of(100, TimeUnit.MILLISECONDS))); // 
>>>>> TimestampExtractor implements BoundedOutOfOrdernessTimestampExtractor
>>>>> 
>>>>> KeyedStream<Message, String> keyByStream = msgStream.keyBy(new 
>>>>> CustomKeySelector());
>>>>>             
>>>>> WindowedStream<Message, String, TimeWindow> windowedStream =
>>>>>         
>>>>> keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
>>>>> 
>>>>> SingleOutputStreamOperator<Message> aggregatedStream = 
>>>>> windowedStream.apply(new AggregateEntries());
>>>>> 
>>>>> My questions are
>>>>> 
>>>>> - In the above code, data gets passed till the window function but even 
>>>>> after window time the data is not received in the apply function. Do I 
>>>>> have to supply a custom evictor or trigger?
>>>>> 
>>>>> - Since the data is being received from multiple timezones and each 
>>>>> device will have some time difference, would it be ok to assign the 
>>>>> timestamp as that of received timestamp in the message at source (kafka). 
>>>>> Will there be any issues with this?
>>>>> 
>>>>> - Are there any limitations on the number of time windows that can be 
>>>>> created at any given time? In my scenario if there are 1 million devices 
>>>>> there will be 1 million tumbling windows.
>>>>> 
>>>>> Thanks,
>>>>> Govind
>>>> 
>> 
> 

Reply via email to