Hi Fabian,
I want to extract timestamps from my event. However, the events stream can
be sparse at times (e.g. 2 days without any events).
What's the best strategy to create watermarks if I want real-time
processing of the events which enter the stream?

Jayant Ameta

On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Another thing to point out is that watermarks are usually data-driven,
> i.e., they depend on the timestamps of the events and not on the clock of
> the machine.
> Otherwise, you might observe a lot of late data, i.e., events with
> timestamps smaller than the last watermark.
>
> If you assign timestamps and watermarks based on the clock of the machine,
> you might also use ingestion time instead of event time.
>
> 2018-01-11 11:49 GMT+01:00 Jayant Ameta <wittyam...@gmail.com>:
>
>> Thanks Gary,
>> I was only trying with a fixed set of events, so the Watermark was not
>> advancing, like you said.
>>
>>
>> Jayant Ameta
>>
>> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <g...@data-artisans.com> wrote:
>>
>>> Hi Jayant,
>>>
>>> The difference is that the Watermarks from
>>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest
>>> timestamp of
>>> all previous events. That is, if you do not receive new events, the
>>> Watermark
>>> will not advance. In contrast, your custom implementation of
>>> AssignerWithPeriodicWatermarks always advances the Watermark based on
>>> the wall
>>> clock.
>>>
>>> Maybe this will already help you to debug your application. If not, it
>>> would be
>>> great to see a minimal working example.
>>>
>>> Best,
>>> Gary
>>>
>>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <wittyam...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
>>>> not firing. However, the trigger fires when using custom timestamp
>>>> extractor with similar watermark.
>>>>
>>>> Sample code below:
>>>> 1.Assigner as anonymous class which works fine
>>>>
>>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new 
>>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
>>>>
>>>>     @Override
>>>>     public long extractTimestamp(Tuple2<Rule, T> element, long 
>>>> previousElementTimestamp) {
>>>>         return System.currentTimeMillis();
>>>>     }
>>>>
>>>>     @Override
>>>>     public final Watermark getCurrentWatermark() {
>>>>         // this guarantees that the watermark never goes backwards.
>>>>         return new Watermark(System.currentTimeMillis()-100);
>>>>     }
>>>> };
>>>>
>>>>
>>>> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>>>>
>>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new 
>>>> BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, 
>>>> T>>(Time.milliseconds(100)) {
>>>>
>>>>     @Override
>>>>     public long extractTimestamp(Tuple2<Rule, T> element) {
>>>>         return System.currentTimeMillis();
>>>>     }
>>>> };
>>>>
>>>>
>>>> Do you see any difference in the approaches?
>>>>
>>>> - Jayant
>>>>
>>>
>>>
>>
>

Reply via email to