Hi Guowei,
Thx for your reply.
I am trying to understand the logic behind the Point 1 i.e current
Watermark being currMaxTimestamp minus the bound.
So, does this mean the Operator processing a task has a current Event time
< current Watermark < currMaxTimestamp ??? Then the Operator progresses to
the next Watermark as a starting point for events after event time reaches
currWatermark ?
Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.

// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;


How does it guarantee that watermark never goes backwards ?

TIA,

Vijay



On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,
> 1. From doc[1], A Watermark(t) declares that event time has reached time t
> in that stream, meaning that there should be no more elements from the
> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
> to the watermark). So I think it might be counterintuitive that generating
> a watermark, which is bigger than the timestamp of current element. At
> least you should minus the bound.
> 2. From the definition of watermark I think that watermark is not related
> with the length of window. The bound is related to your application.
> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
> choice. Watermark is not free, you might send too many watermarks. If your
> source could generate some "watermark" element I think you could use the
> interface. You could choose AssignerWithPeriodicWatermarks. You can find
> the example from doc[2].
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
> 2.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
> Best,
> Guowei
>
>
> Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 上午7:41写道:
>
>> Hi,
>> I have created a TimestampAssigner as follows.
>> I want to use monitoring.getEventTimestamp() with an Event Time
>> processing and collected aggregated stats over time window intervals of 5
>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
>> with a bound ? I want to collect the stats for each eventTimestamp + window
>> intervals. My understanding - *the generated watermark which is
>> eventTimestamp + bound will collect all the eventTimestamp's which arrive
>> within that Watermark inside each eventTimestamp + 5s etc window interval.
>> Or does this bound have to be based on the windowInterval i.e
>> extractedTimestamp + windowInterval + bound *??
>>
>>
>>> *public class MonitoringTSWAssigner implements
>>> AssignerWithPunctuatedWatermarks<Monitoring> {*
>>> * private long bound = 5 * (long) 1000; *
>>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
>>> *        return monitoring.getEventTimestamp();**    }*
>>>
>>> *    public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>> long extractedTimestamp) {*
>>> *        return new Watermark(extractedTimestamp + bound);//<==== should
>>> it be - bound ?*
>>> *    }**}*
>>
>>
>> Used here:
>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> final DataStreamSource<Monitoring> monitoringDataStreamSource =
>>> env.addSource(....);
>>> DataStream<Monitoring> kinesisStream =
>>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>>> MonitoringTSWAssigner());
>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream =
>>> kinesisStream.keyBy("deployment", .....);
>>> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>>>
>>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>>
>>
>> TIA,
>>
>

Reply via email to