Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-26 Thread Aljoscha Krettek
A while back I wrote this slightly more elaborate extractor that will advance 
the watermark independently after the stream is idle for a while: 
https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
 


Best,
Aljoscha

> On 16. Jan 2018, at 10:29, Fabian Hueske  wrote:
> 
> This depends on the requirements of your application.
> Using the usual watermark generation strategies which are purely data driven, 
> a stream that does not produce data would not advance its watermarks.
> Not advancing the watermarks means that the program cannot make progress. 
> 
> This might also be fine if your program consumes a single stream because if 
> this stream does not produce data, your program also doesn't have anything to 
> compute (there might be still data left. such as a window, that is not 
> computed). 
> The situation becomes more tricky, if your program has multiple sources that 
> become inactive at some point or a source where a partition can become 
> inactive.
> 
> AFAIK, there is a mechanism to mark partitions (and maybe complete sources) 
> as inactive.
> @Gordon (in CC) knows more about this feature.
> 
> Best, Fabian
> 
> 2018-01-15 14:51 GMT+01:00 Jayant Ameta  >:
> Hi Fabian,
> I want to extract timestamps from my event. However, the events stream can be 
> sparse at times (e.g. 2 days without any events).
> What's the best strategy to create watermarks if I want real-time processing 
> of the events which enter the stream?
> 
> Jayant Ameta
> 
> On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske  > wrote:
> Another thing to point out is that watermarks are usually data-driven, i.e., 
> they depend on the timestamps of the events and not on the clock of the 
> machine.
> Otherwise, you might observe a lot of late data, i.e., events with timestamps 
> smaller than the last watermark.
> 
> If you assign timestamps and watermarks based on the clock of the machine, 
> you might also use ingestion time instead of event time.
> 
> 2018-01-11 11:49 GMT+01:00 Jayant Ameta  >:
> Thanks Gary,
> I was only trying with a fixed set of events, so the Watermark was not 
> advancing, like you said.
> 
> 
> Jayant Ameta
> 
> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao  > wrote:
> Hi Jayant,
> 
> The difference is that the Watermarks from
> BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
> all previous events. That is, if you do not receive new events, the Watermark
> will not advance. In contrast, your custom implementation of
> AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
> clock.
> 
> Maybe this will already help you to debug your application. If not, it would 
> be
> great to see a minimal working example.
> 
> Best,
> Gary
> 
> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta  > wrote:
> Hi,
> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not 
> firing. However, the trigger fires when using custom timestamp extractor with 
> similar watermark.
> 
> Sample code below:
> 1.Assigner as anonymous class which works fine
> AssignerWithPeriodicWatermarks> assigner = new 
> AssignerWithPeriodicWatermarks>() {
> @Override
> public long extractTimestamp(Tuple2 element, long 
> previousElementTimestamp) {
> return System.currentTimeMillis();
> }
> 
> @Override
> public final Watermark getCurrentWatermark() {
> // this guarantees that the watermark never goes backwards.
> return new Watermark(System.currentTimeMillis()-100);
> }
> };
> 
> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
> AssignerWithPeriodicWatermarks> assigner = new 
> BoundedOutOfOrdernessTimestampExtractor T>>(Time.milliseconds(100)) {
> @Override
> public long extractTimestamp(Tuple2 element) {
> return System.currentTimeMillis();
> }
> };
> 
> Do you see any difference in the approaches?
> 
> - Jayant
> 
> 
> 
> 
> 



Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-16 Thread Fabian Hueske
This depends on the requirements of your application.
Using the usual watermark generation strategies which are purely data
driven, a stream that does not produce data would not advance its
watermarks.
Not advancing the watermarks means that the program cannot make progress.

This might also be fine if your program consumes a single stream because if
this stream does not produce data, your program also doesn't have anything
to compute (there might be still data left. such as a window, that is not
computed).
The situation becomes more tricky, if your program has multiple sources
that become inactive at some point or a source where a partition can become
inactive.

AFAIK, there is a mechanism to mark partitions (and maybe complete sources)
as inactive.
@Gordon (in CC) knows more about this feature.

Best, Fabian

2018-01-15 14:51 GMT+01:00 Jayant Ameta :

> Hi Fabian,
> I want to extract timestamps from my event. However, the events stream can
> be sparse at times (e.g. 2 days without any events).
> What's the best strategy to create watermarks if I want real-time
> processing of the events which enter the stream?
>
> Jayant Ameta
>
> On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske  wrote:
>
>> Another thing to point out is that watermarks are usually data-driven,
>> i.e., they depend on the timestamps of the events and not on the clock of
>> the machine.
>> Otherwise, you might observe a lot of late data, i.e., events with
>> timestamps smaller than the last watermark.
>>
>> If you assign timestamps and watermarks based on the clock of the
>> machine, you might also use ingestion time instead of event time.
>>
>> 2018-01-11 11:49 GMT+01:00 Jayant Ameta :
>>
>>> Thanks Gary,
>>> I was only trying with a fixed set of events, so the Watermark was not
>>> advancing, like you said.
>>>
>>>
>>> Jayant Ameta
>>>
>>> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao 
>>> wrote:
>>>
 Hi Jayant,

 The difference is that the Watermarks from
 BoundedOutOfOrdernessTimestampExtractor are based on the greatest
 timestamp of
 all previous events. That is, if you do not receive new events, the
 Watermark
 will not advance. In contrast, your custom implementation of
 AssignerWithPeriodicWatermarks always advances the Watermark based on
 the wall
 clock.

 Maybe this will already help you to debug your application. If not, it
 would be
 great to see a minimal working example.

 Best,
 Gary

 On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta 
 wrote:

> Hi,
> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
> not firing. However, the trigger fires when using custom timestamp
> extractor with similar watermark.
>
> Sample code below:
> 1.Assigner as anonymous class which works fine
>
> AssignerWithPeriodicWatermarks> assigner = new 
> AssignerWithPeriodicWatermarks>() {
>
> @Override
> public long extractTimestamp(Tuple2 element, long 
> previousElementTimestamp) {
> return System.currentTimeMillis();
> }
>
> @Override
> public final Watermark getCurrentWatermark() {
> // this guarantees that the watermark never goes backwards.
> return new Watermark(System.currentTimeMillis()-100);
> }
> };
>
>
> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>
> AssignerWithPeriodicWatermarks> assigner = new 
> BoundedOutOfOrdernessTimestampExtractor T>>(Time.milliseconds(100)) {
>
> @Override
> public long extractTimestamp(Tuple2 element) {
> return System.currentTimeMillis();
> }
> };
>
>
> Do you see any difference in the approaches?
>
> - Jayant
>


>>>
>>
>


Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-15 Thread Jayant Ameta
Hi Fabian,
I want to extract timestamps from my event. However, the events stream can
be sparse at times (e.g. 2 days without any events).
What's the best strategy to create watermarks if I want real-time
processing of the events which enter the stream?

Jayant Ameta

On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske  wrote:

> Another thing to point out is that watermarks are usually data-driven,
> i.e., they depend on the timestamps of the events and not on the clock of
> the machine.
> Otherwise, you might observe a lot of late data, i.e., events with
> timestamps smaller than the last watermark.
>
> If you assign timestamps and watermarks based on the clock of the machine,
> you might also use ingestion time instead of event time.
>
> 2018-01-11 11:49 GMT+01:00 Jayant Ameta :
>
>> Thanks Gary,
>> I was only trying with a fixed set of events, so the Watermark was not
>> advancing, like you said.
>>
>>
>> Jayant Ameta
>>
>> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao  wrote:
>>
>>> Hi Jayant,
>>>
>>> The difference is that the Watermarks from
>>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest
>>> timestamp of
>>> all previous events. That is, if you do not receive new events, the
>>> Watermark
>>> will not advance. In contrast, your custom implementation of
>>> AssignerWithPeriodicWatermarks always advances the Watermark based on
>>> the wall
>>> clock.
>>>
>>> Maybe this will already help you to debug your application. If not, it
>>> would be
>>> great to see a minimal working example.
>>>
>>> Best,
>>> Gary
>>>
>>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta 
>>> wrote:
>>>
 Hi,
 When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
 not firing. However, the trigger fires when using custom timestamp
 extractor with similar watermark.

 Sample code below:
 1.Assigner as anonymous class which works fine

 AssignerWithPeriodicWatermarks> assigner = new 
 AssignerWithPeriodicWatermarks>() {

 @Override
 public long extractTimestamp(Tuple2 element, long 
 previousElementTimestamp) {
 return System.currentTimeMillis();
 }

 @Override
 public final Watermark getCurrentWatermark() {
 // this guarantees that the watermark never goes backwards.
 return new Watermark(System.currentTimeMillis()-100);
 }
 };


 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work

 AssignerWithPeriodicWatermarks> assigner = new 
 BoundedOutOfOrdernessTimestampExtractor>(Time.milliseconds(100)) {

 @Override
 public long extractTimestamp(Tuple2 element) {
 return System.currentTimeMillis();
 }
 };


 Do you see any difference in the approaches?

 - Jayant

>>>
>>>
>>
>


Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-11 Thread Fabian Hueske
Another thing to point out is that watermarks are usually data-driven,
i.e., they depend on the timestamps of the events and not on the clock of
the machine.
Otherwise, you might observe a lot of late data, i.e., events with
timestamps smaller than the last watermark.

If you assign timestamps and watermarks based on the clock of the machine,
you might also use ingestion time instead of event time.

2018-01-11 11:49 GMT+01:00 Jayant Ameta :

> Thanks Gary,
> I was only trying with a fixed set of events, so the Watermark was not
> advancing, like you said.
>
>
> Jayant Ameta
>
> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao  wrote:
>
>> Hi Jayant,
>>
>> The difference is that the Watermarks from
>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest
>> timestamp of
>> all previous events. That is, if you do not receive new events, the
>> Watermark
>> will not advance. In contrast, your custom implementation of
>> AssignerWithPeriodicWatermarks always advances the Watermark based on the
>> wall
>> clock.
>>
>> Maybe this will already help you to debug your application. If not, it
>> would be
>> great to see a minimal working example.
>>
>> Best,
>> Gary
>>
>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta 
>> wrote:
>>
>>> Hi,
>>> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
>>> not firing. However, the trigger fires when using custom timestamp
>>> extractor with similar watermark.
>>>
>>> Sample code below:
>>> 1.Assigner as anonymous class which works fine
>>>
>>> AssignerWithPeriodicWatermarks> assigner = new 
>>> AssignerWithPeriodicWatermarks>() {
>>>
>>> @Override
>>> public long extractTimestamp(Tuple2 element, long 
>>> previousElementTimestamp) {
>>> return System.currentTimeMillis();
>>> }
>>>
>>> @Override
>>> public final Watermark getCurrentWatermark() {
>>> // this guarantees that the watermark never goes backwards.
>>> return new Watermark(System.currentTimeMillis()-100);
>>> }
>>> };
>>>
>>>
>>> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>>>
>>> AssignerWithPeriodicWatermarks> assigner = new 
>>> BoundedOutOfOrdernessTimestampExtractor>> T>>(Time.milliseconds(100)) {
>>>
>>> @Override
>>> public long extractTimestamp(Tuple2 element) {
>>> return System.currentTimeMillis();
>>> }
>>> };
>>>
>>>
>>> Do you see any difference in the approaches?
>>>
>>> - Jayant
>>>
>>
>>
>


Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-11 Thread Gary Yao
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp
of
all previous events. That is, if you do not receive new events, the
Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the
wall
clock.

Maybe this will already help you to debug your application. If not, it
would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta  wrote:

> Hi,
> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not
> firing. However, the trigger fires when using custom timestamp extractor
> with similar watermark.
>
> Sample code below:
> 1.Assigner as anonymous class which works fine
>
> AssignerWithPeriodicWatermarks> assigner = new 
> AssignerWithPeriodicWatermarks>() {
>
> @Override
> public long extractTimestamp(Tuple2 element, long 
> previousElementTimestamp) {
> return System.currentTimeMillis();
> }
>
> @Override
> public final Watermark getCurrentWatermark() {
> // this guarantees that the watermark never goes backwards.
> return new Watermark(System.currentTimeMillis()-100);
> }
> };
>
>
> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>
> AssignerWithPeriodicWatermarks> assigner = new 
> BoundedOutOfOrdernessTimestampExtractor T>>(Time.milliseconds(100)) {
>
> @Override
> public long extractTimestamp(Tuple2 element) {
> return System.currentTimeMillis();
> }
> };
>
>
> Do you see any difference in the approaches?
>
> - Jayant
>