This depends on the requirements of your application.
Using the usual watermark generation strategies which are purely data
driven, a stream that does not produce data would not advance its
watermarks.
Not advancing the watermarks means that the program cannot make progress.

This might also be fine if your program consumes a single stream because if
this stream does not produce data, your program also doesn't have anything
to compute (there might be still data left. such as a window, that is not
computed).
The situation becomes more tricky, if your program has multiple sources
that become inactive at some point or a source where a partition can become
inactive.

AFAIK, there is a mechanism to mark partitions (and maybe complete sources)
as inactive.
@Gordon (in CC) knows more about this feature.

Best, Fabian

2018-01-15 14:51 GMT+01:00 Jayant Ameta <wittyam...@gmail.com>:

> Hi Fabian,
> I want to extract timestamps from my event. However, the events stream can
> be sparse at times (e.g. 2 days without any events).
> What's the best strategy to create watermarks if I want real-time
> processing of the events which enter the stream?
>
> Jayant Ameta
>
> On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Another thing to point out is that watermarks are usually data-driven,
>> i.e., they depend on the timestamps of the events and not on the clock of
>> the machine.
>> Otherwise, you might observe a lot of late data, i.e., events with
>> timestamps smaller than the last watermark.
>>
>> If you assign timestamps and watermarks based on the clock of the
>> machine, you might also use ingestion time instead of event time.
>>
>> 2018-01-11 11:49 GMT+01:00 Jayant Ameta <wittyam...@gmail.com>:
>>
>>> Thanks Gary,
>>> I was only trying with a fixed set of events, so the Watermark was not
>>> advancing, like you said.
>>>
>>>
>>> Jayant Ameta
>>>
>>> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <g...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Jayant,
>>>>
>>>> The difference is that the Watermarks from
>>>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest
>>>> timestamp of
>>>> all previous events. That is, if you do not receive new events, the
>>>> Watermark
>>>> will not advance. In contrast, your custom implementation of
>>>> AssignerWithPeriodicWatermarks always advances the Watermark based on
>>>> the wall
>>>> clock.
>>>>
>>>> Maybe this will already help you to debug your application. If not, it
>>>> would be
>>>> great to see a minimal working example.
>>>>
>>>> Best,
>>>> Gary
>>>>
>>>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <wittyam...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
>>>>> not firing. However, the trigger fires when using custom timestamp
>>>>> extractor with similar watermark.
>>>>>
>>>>> Sample code below:
>>>>> 1.Assigner as anonymous class which works fine
>>>>>
>>>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new 
>>>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
>>>>>
>>>>>     @Override
>>>>>     public long extractTimestamp(Tuple2<Rule, T> element, long 
>>>>> previousElementTimestamp) {
>>>>>         return System.currentTimeMillis();
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public final Watermark getCurrentWatermark() {
>>>>>         // this guarantees that the watermark never goes backwards.
>>>>>         return new Watermark(System.currentTimeMillis()-100);
>>>>>     }
>>>>> };
>>>>>
>>>>>
>>>>> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>>>>>
>>>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new 
>>>>> BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, 
>>>>> T>>(Time.milliseconds(100)) {
>>>>>
>>>>>     @Override
>>>>>     public long extractTimestamp(Tuple2<Rule, T> element) {
>>>>>         return System.currentTimeMillis();
>>>>>     }
>>>>> };
>>>>>
>>>>>
>>>>> Do you see any difference in the approaches?
>>>>>
>>>>> - Jayant
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to