Hi Guowei, Thx for your reply. I am trying to understand the logic behind the Point 1 i.e current Watermark being currMaxTimestamp minus the bound. So, does this mean the Operator processing a task has a current Event time < current Watermark < currMaxTimestamp ??? Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ? Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
// this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; How does it guarantee that watermark never goes backwards ? TIA, Vijay On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi, > 1. From doc[1], A Watermark(t) declares that event time has reached time t > in that stream, meaning that there should be no more elements from the > stream with a timestamp t’ <= t (i.e. events with timestamps older or equal > to the watermark). So I think it might be counterintuitive that generating > a watermark, which is bigger than the timestamp of current element. At > least you should minus the bound. > 2. From the definition of watermark I think that watermark is not related > with the length of window. The bound is related to your application. > 3. In your case AssignerWithPunctuatedWatermarks might not be a good > choice. Watermark is not free, you might send too many watermarks. If your > source could generate some "watermark" element I think you could use the > interface. You could choose AssignerWithPeriodicWatermarks. You can find > the example from doc[2]. > > 1. > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks > 2. > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators > Best, > Guowei > > > Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 上午7:41写道: > >> Hi, >> I have created a TimestampAssigner as follows. >> I want to use monitoring.getEventTimestamp() with an Event Time >> processing and collected aggregated stats over time window intervals of 5 >> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner >> with a bound ? I want to collect the stats for each eventTimestamp + window >> intervals. My understanding - *the generated watermark which is >> eventTimestamp + bound will collect all the eventTimestamp's which arrive >> within that Watermark inside each eventTimestamp + 5s etc window interval. >> Or does this bound have to be based on the windowInterval i.e >> extractedTimestamp + windowInterval + bound *?? >> >> >>> *public class MonitoringTSWAssigner implements >>> AssignerWithPunctuatedWatermarks<Monitoring> {* >>> * private long bound = 5 * (long) 1000; * >>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {* >>> * return monitoring.getEventTimestamp();** }* >>> >>> * public Watermark checkAndGetNextWatermark(Monitoring monitoring, >>> long extractedTimestamp) {* >>> * return new Watermark(extractedTimestamp + bound);//<==== should >>> it be - bound ?* >>> * }**}* >> >> >> Used here: >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> final DataStreamSource<Monitoring> monitoringDataStreamSource = >>> env.addSource(....); >>> DataStream<Monitoring> kinesisStream = >>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new >>> MonitoringTSWAssigner()); >>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = >>> kinesisStream.keyBy("deployment", .....); >>> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = >>> >>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window >> >> >> TIA, >> >