Re: Timestamp Watermark Assigner bpund question

2019-04-14 Thread Guowei Ma
sorry for missing a not. :(
Whether the watermark, which is generated by the
AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to
the downstream is controlled by the framework. If an operator returns a
watermark going back Flink would _*not_* send it to the downstream.


Best,
Guowei


Guowei Ma  于2019年4月15日周一 上午9:44写道:

> Hi, Vijay
>
> >>>Then the Operator progresses to the next Watermark as a starting point
> for events after event time reaches currWatermark ?
> AFAIK, the operator that generates watermark is called by the frame work.
> When the operator is called depends on the operator itself. For example the
> operator that implements the AssignerWithPunctuatedWatermarks interface
> would be called for every element.
>
> >>>How does it guarantee that watermark never goes backwards ?
> Whether the watermark, which is generated by the
> AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to
> the downstream is controlled by the framework. If an operator returns a
> watermark going back Flink would send it to the downstream.
>
> Best,
> Guowei
>
>
> Vijay Balakrishnan  于2019年4月10日周三 下午11:44写道:
>
>> Hi Guowei,
>> Thx for your reply.
>> I am trying to understand the logic behind the Point 1 i.e current
>> Watermark being currMaxTimestamp minus the bound.
>> So, does this mean the Operator processing a task has a current Event
>> time < current Watermark < currMaxTimestamp ??? Then the Operator
>> progresses to the next Watermark as a starting point for events after event
>> time reaches currWatermark ?
>> Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
>>
>> // this guarantees that the watermark never goes backwards.
>> long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
>>
>>
>> How does it guarantee that watermark never goes backwards ?
>>
>> TIA,
>>
>> Vijay
>>
>>
>>
>> On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma  wrote:
>>
>>> Hi,
>>> 1. From doc[1], A Watermark(t) declares that event time has reached time
>>> t in that stream, meaning that there should be no more elements from the
>>> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
>>> to the watermark). So I think it might be counterintuitive that generating
>>> a watermark, which is bigger than the timestamp of current element. At
>>> least you should minus the bound.
>>> 2. From the definition of watermark I think that watermark is not
>>> related with the length of window. The bound is related to your application.
>>> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
>>> choice. Watermark is not free, you might send too many watermarks. If your
>>> source could generate some "watermark" element I think you could use the
>>> interface. You could choose AssignerWithPeriodicWatermarks. You can find
>>> the example from doc[2].
>>>
>>> 1.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
>>> 2.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
>>> Best,
>>> Guowei
>>>
>>>
>>> Vijay Balakrishnan  于2019年4月10日周三 上午7:41写道:
>>>
 Hi,
 I have created a TimestampAssigner as follows.
 I want to use monitoring.getEventTimestamp() with an Event Time
 processing and collected aggregated stats over time window intervals of 5
 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
 with a bound ? I want to collect the stats for each eventTimestamp + window
 intervals. My understanding - *the generated watermark which is
 eventTimestamp + bound will collect all the eventTimestamp's which arrive
 within that Watermark inside each eventTimestamp + 5s etc window interval.
 Or does this bound have to be based on the windowInterval i.e
 extractedTimestamp + windowInterval + bound *??


> *public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks {*
> * private long bound = 5 * (long) 1000; *
> * public long extractTimestamp(Monitoring monitoring, long previousTS)
> {*
> *return monitoring.getEventTimestamp();**}*
>
> *public Watermark checkAndGetNextWatermark(Monitoring monitoring,
> long extractedTimestamp) {*
> *return new Watermark(extractedTimestamp + bound);//<
> should it be - bound ?*
> *}**}*


 Used here:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final DataStreamSource monitoringDataStreamSource =
> env.addSource();
> DataStream kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> MonitoringTSWAssigner());
> KeyedStream monitoringTupleKeyedStream =
> kinesisStream.keyBy("deployment", .);
> final WindowedStream windowStream =
>
> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time 

Re: Timestamp Watermark Assigner bpund question

2019-04-14 Thread Guowei Ma
Hi, Vijay

>>>Then the Operator progresses to the next Watermark as a starting point
for events after event time reaches currWatermark ?
AFAIK, the operator that generates watermark is called by the frame work.
When the operator is called depends on the operator itself. For example the
operator that implements the AssignerWithPunctuatedWatermarks interface
would be called for every element.

>>>How does it guarantee that watermark never goes backwards ?
Whether the watermark, which is generated by the
AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to
the downstream is controlled by the framework. If an operator returns a
watermark going back Flink would send it to the downstream.

Best,
Guowei


Vijay Balakrishnan  于2019年4月10日周三 下午11:44写道:

> Hi Guowei,
> Thx for your reply.
> I am trying to understand the logic behind the Point 1 i.e current
> Watermark being currMaxTimestamp minus the bound.
> So, does this mean the Operator processing a task has a current Event time
> < current Watermark < currMaxTimestamp ??? Then the Operator progresses to
> the next Watermark as a starting point for events after event time reaches
> currWatermark ?
> Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
>
> // this guarantees that the watermark never goes backwards.
> long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
>
>
> How does it guarantee that watermark never goes backwards ?
>
> TIA,
>
> Vijay
>
>
>
> On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma  wrote:
>
>> Hi,
>> 1. From doc[1], A Watermark(t) declares that event time has reached time
>> t in that stream, meaning that there should be no more elements from the
>> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
>> to the watermark). So I think it might be counterintuitive that generating
>> a watermark, which is bigger than the timestamp of current element. At
>> least you should minus the bound.
>> 2. From the definition of watermark I think that watermark is not related
>> with the length of window. The bound is related to your application.
>> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
>> choice. Watermark is not free, you might send too many watermarks. If your
>> source could generate some "watermark" element I think you could use the
>> interface. You could choose AssignerWithPeriodicWatermarks. You can find
>> the example from doc[2].
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
>> 2.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
>> Best,
>> Guowei
>>
>>
>> Vijay Balakrishnan  于2019年4月10日周三 上午7:41写道:
>>
>>> Hi,
>>> I have created a TimestampAssigner as follows.
>>> I want to use monitoring.getEventTimestamp() with an Event Time
>>> processing and collected aggregated stats over time window intervals of 5
>>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
>>> with a bound ? I want to collect the stats for each eventTimestamp + window
>>> intervals. My understanding - *the generated watermark which is
>>> eventTimestamp + bound will collect all the eventTimestamp's which arrive
>>> within that Watermark inside each eventTimestamp + 5s etc window interval.
>>> Or does this bound have to be based on the windowInterval i.e
>>> extractedTimestamp + windowInterval + bound *??
>>>
>>>
 *public class MonitoringTSWAssigner implements
 AssignerWithPunctuatedWatermarks {*
 * private long bound = 5 * (long) 1000; *
 * public long extractTimestamp(Monitoring monitoring, long previousTS)
 {*
 *return monitoring.getEventTimestamp();**}*

 *public Watermark checkAndGetNextWatermark(Monitoring monitoring,
 long extractedTimestamp) {*
 *return new Watermark(extractedTimestamp + bound);//<
 should it be - bound ?*
 *}**}*
>>>
>>>
>>> Used here:
>>>
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 final DataStreamSource monitoringDataStreamSource =
 env.addSource();
 DataStream kinesisStream =
 monitoringDataStreamSource.assignTimestampsAndWatermarks(new
 MonitoringTSWAssigner());
 KeyedStream monitoringTupleKeyedStream =
 kinesisStream.keyBy("deployment", .);
 final WindowedStream windowStream =

 monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>>>
>>>
>>> TIA,
>>>
>>


Re: Timestamp Watermark Assigner bpund question

2019-04-10 Thread Vijay Balakrishnan
Hi Guowei,
Thx for your reply.
I am trying to understand the logic behind the Point 1 i.e current
Watermark being currMaxTimestamp minus the bound.
So, does this mean the Operator processing a task has a current Event time
< current Watermark < currMaxTimestamp ??? Then the Operator progresses to
the next Watermark as a starting point for events after event time reaches
currWatermark ?
Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.

// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;


How does it guarantee that watermark never goes backwards ?

TIA,

Vijay



On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma  wrote:

> Hi,
> 1. From doc[1], A Watermark(t) declares that event time has reached time t
> in that stream, meaning that there should be no more elements from the
> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
> to the watermark). So I think it might be counterintuitive that generating
> a watermark, which is bigger than the timestamp of current element. At
> least you should minus the bound.
> 2. From the definition of watermark I think that watermark is not related
> with the length of window. The bound is related to your application.
> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
> choice. Watermark is not free, you might send too many watermarks. If your
> source could generate some "watermark" element I think you could use the
> interface. You could choose AssignerWithPeriodicWatermarks. You can find
> the example from doc[2].
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
> 2.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
> Best,
> Guowei
>
>
> Vijay Balakrishnan  于2019年4月10日周三 上午7:41写道:
>
>> Hi,
>> I have created a TimestampAssigner as follows.
>> I want to use monitoring.getEventTimestamp() with an Event Time
>> processing and collected aggregated stats over time window intervals of 5
>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
>> with a bound ? I want to collect the stats for each eventTimestamp + window
>> intervals. My understanding - *the generated watermark which is
>> eventTimestamp + bound will collect all the eventTimestamp's which arrive
>> within that Watermark inside each eventTimestamp + 5s etc window interval.
>> Or does this bound have to be based on the windowInterval i.e
>> extractedTimestamp + windowInterval + bound *??
>>
>>
>>> *public class MonitoringTSWAssigner implements
>>> AssignerWithPunctuatedWatermarks {*
>>> * private long bound = 5 * (long) 1000; *
>>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
>>> *return monitoring.getEventTimestamp();**}*
>>>
>>> *public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>> long extractedTimestamp) {*
>>> *return new Watermark(extractedTimestamp + bound);//< should
>>> it be - bound ?*
>>> *}**}*
>>
>>
>> Used here:
>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> final DataStreamSource monitoringDataStreamSource =
>>> env.addSource();
>>> DataStream kinesisStream =
>>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>>> MonitoringTSWAssigner());
>>> KeyedStream monitoringTupleKeyedStream =
>>> kinesisStream.keyBy("deployment", .);
>>> final WindowedStream windowStream =
>>>
>>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>>
>>
>> TIA,
>>
>


Re: Timestamp Watermark Assigner bpund question

2019-04-09 Thread Guowei Ma
Hi,
1. From doc[1], A Watermark(t) declares that event time has reached time t
in that stream, meaning that there should be no more elements from the
stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
to the watermark). So I think it might be counterintuitive that generating
a watermark, which is bigger than the timestamp of current element. At
least you should minus the bound.
2. From the definition of watermark I think that watermark is not related
with the length of window. The bound is related to your application.
3. In your case AssignerWithPunctuatedWatermarks might not be a good
choice. Watermark is not free, you might send too many watermarks. If your
source could generate some "watermark" element I think you could use the
interface. You could choose AssignerWithPeriodicWatermarks. You can find
the example from doc[2].

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
Best,
Guowei


Vijay Balakrishnan  于2019年4月10日周三 上午7:41写道:

> Hi,
> I have created a TimestampAssigner as follows.
> I want to use monitoring.getEventTimestamp() with an Event Time processing
> and collected aggregated stats over time window intervals of 5 secs, 5 mins
> etc. Is this the right way to create the TimeWaterMarkAssigner with a bound
> ? I want to collect the stats for each eventTimestamp + window intervals.
> My understanding - *the generated watermark which is eventTimestamp +
> bound will collect all the eventTimestamp's which arrive within that
> Watermark inside each eventTimestamp + 5s etc window interval. Or does this
> bound have to be based on the windowInterval i.e extractedTimestamp +
> windowInterval + bound *??
>
>
>> *public class MonitoringTSWAssigner implements
>> AssignerWithPunctuatedWatermarks {*
>> * private long bound = 5 * (long) 1000; *
>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
>> *return monitoring.getEventTimestamp();**}*
>>
>> *public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>> long extractedTimestamp) {*
>> *return new Watermark(extractedTimestamp + bound);//< should
>> it be - bound ?*
>> *}**}*
>
>
> Used here:
>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> final DataStreamSource monitoringDataStreamSource =
>> env.addSource();
>> DataStream kinesisStream =
>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>> MonitoringTSWAssigner());
>> KeyedStream monitoringTupleKeyedStream =
>> kinesisStream.keyBy("deployment", .);
>> final WindowedStream windowStream =
>>
>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>
>
> TIA,
>


Timestamp Watermark Assigner bpund question

2019-04-09 Thread Vijay Balakrishnan
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing
and collected aggregated stats over time window intervals of 5 secs, 5 mins
etc. Is this the right way to create the TimeWaterMarkAssigner with a bound
? I want to collect the stats for each eventTimestamp + window intervals.
My understanding - *the generated watermark which is eventTimestamp + bound
will collect all the eventTimestamp's which arrive within that Watermark
inside each eventTimestamp + 5s etc window interval. Or does this bound
have to be based on the windowInterval i.e extractedTimestamp +
windowInterval + bound *??


> *public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks {*
> * private long bound = 5 * (long) 1000; *
> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
> *return monitoring.getEventTimestamp();**}*
>
> *public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {*
> *return new Watermark(extractedTimestamp + bound);//< should
> it be - bound ?*
> *}**}*


Used here:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final DataStreamSource monitoringDataStreamSource =
> env.addSource();
> DataStream kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> MonitoringTSWAssigner());
> KeyedStream monitoringTupleKeyedStream =
> kinesisStream.keyBy("deployment", .);
> final WindowedStream windowStream =
> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5
> sec time window


TIA,