Re: Timestamp Watermark Assigner bpund question
sorry for missing a not. :( Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would _*not_* send it to the downstream. Best, Guowei Guowei Ma 于2019年4月15日周一 上午9:44写道: > Hi, Vijay > > >>>Then the Operator progresses to the next Watermark as a starting point > for events after event time reaches currWatermark ? > AFAIK, the operator that generates watermark is called by the frame work. > When the operator is called depends on the operator itself. For example the > operator that implements the AssignerWithPunctuatedWatermarks interface > would be called for every element. > > >>>How does it guarantee that watermark never goes backwards ? > Whether the watermark, which is generated by the > AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to > the downstream is controlled by the framework. If an operator returns a > watermark going back Flink would send it to the downstream. > > Best, > Guowei > > > Vijay Balakrishnan 于2019年4月10日周三 下午11:44写道: > >> Hi Guowei, >> Thx for your reply. >> I am trying to understand the logic behind the Point 1 i.e current >> Watermark being currMaxTimestamp minus the bound. >> So, does this mean the Operator processing a task has a current Event >> time < current Watermark < currMaxTimestamp ??? Then the Operator >> progresses to the next Watermark as a starting point for events after event >> time reaches currWatermark ? >> Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java. >> >> // this guarantees that the watermark never goes backwards. >> long potentialWM = currentMaxTimestamp - maxOutOfOrderness; >> >> >> How does it guarantee that watermark never goes backwards ? >> >> TIA, >> >> Vijay >> >> >> >> On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma wrote: >> >>> Hi, >>> 1. From doc[1], A Watermark(t) declares that event time has reached time >>> t in that stream, meaning that there should be no more elements from the >>> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal >>> to the watermark). So I think it might be counterintuitive that generating >>> a watermark, which is bigger than the timestamp of current element. At >>> least you should minus the bound. >>> 2. From the definition of watermark I think that watermark is not >>> related with the length of window. The bound is related to your application. >>> 3. In your case AssignerWithPunctuatedWatermarks might not be a good >>> choice. Watermark is not free, you might send too many watermarks. If your >>> source could generate some "watermark" element I think you could use the >>> interface. You could choose AssignerWithPeriodicWatermarks. You can find >>> the example from doc[2]. >>> >>> 1. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks >>> 2. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators >>> Best, >>> Guowei >>> >>> >>> Vijay Balakrishnan 于2019年4月10日周三 上午7:41写道: >>> Hi, I have created a TimestampAssigner as follows. I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - *the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound *?? > *public class MonitoringTSWAssigner implements > AssignerWithPunctuatedWatermarks {* > * private long bound = 5 * (long) 1000; * > * public long extractTimestamp(Monitoring monitoring, long previousTS) > {* > *return monitoring.getEventTimestamp();**}* > > *public Watermark checkAndGetNextWatermark(Monitoring monitoring, > long extractedTimestamp) {* > *return new Watermark(extractedTimestamp + bound);//< > should it be - bound ?* > *}**}* Used here: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > final DataStreamSource monitoringDataStreamSource = > env.addSource(); > DataStream kinesisStream = > monitoringDataStreamSource.assignTimestampsAndWatermarks(new > MonitoringTSWAssigner()); > KeyedStream monitoringTupleKeyedStream = > kinesisStream.keyBy("deployment", .); > final WindowedStream windowStream = > > monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time
Re: Timestamp Watermark Assigner bpund question
Hi, Vijay >>>Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ? AFAIK, the operator that generates watermark is called by the frame work. When the operator is called depends on the operator itself. For example the operator that implements the AssignerWithPunctuatedWatermarks interface would be called for every element. >>>How does it guarantee that watermark never goes backwards ? Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would send it to the downstream. Best, Guowei Vijay Balakrishnan 于2019年4月10日周三 下午11:44写道: > Hi Guowei, > Thx for your reply. > I am trying to understand the logic behind the Point 1 i.e current > Watermark being currMaxTimestamp minus the bound. > So, does this mean the Operator processing a task has a current Event time > < current Watermark < currMaxTimestamp ??? Then the Operator progresses to > the next Watermark as a starting point for events after event time reaches > currWatermark ? > Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java. > > // this guarantees that the watermark never goes backwards. > long potentialWM = currentMaxTimestamp - maxOutOfOrderness; > > > How does it guarantee that watermark never goes backwards ? > > TIA, > > Vijay > > > > On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma wrote: > >> Hi, >> 1. From doc[1], A Watermark(t) declares that event time has reached time >> t in that stream, meaning that there should be no more elements from the >> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal >> to the watermark). So I think it might be counterintuitive that generating >> a watermark, which is bigger than the timestamp of current element. At >> least you should minus the bound. >> 2. From the definition of watermark I think that watermark is not related >> with the length of window. The bound is related to your application. >> 3. In your case AssignerWithPunctuatedWatermarks might not be a good >> choice. Watermark is not free, you might send too many watermarks. If your >> source could generate some "watermark" element I think you could use the >> interface. You could choose AssignerWithPeriodicWatermarks. You can find >> the example from doc[2]. >> >> 1. >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks >> 2. >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators >> Best, >> Guowei >> >> >> Vijay Balakrishnan 于2019年4月10日周三 上午7:41写道: >> >>> Hi, >>> I have created a TimestampAssigner as follows. >>> I want to use monitoring.getEventTimestamp() with an Event Time >>> processing and collected aggregated stats over time window intervals of 5 >>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner >>> with a bound ? I want to collect the stats for each eventTimestamp + window >>> intervals. My understanding - *the generated watermark which is >>> eventTimestamp + bound will collect all the eventTimestamp's which arrive >>> within that Watermark inside each eventTimestamp + 5s etc window interval. >>> Or does this bound have to be based on the windowInterval i.e >>> extractedTimestamp + windowInterval + bound *?? >>> >>> *public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks {* * private long bound = 5 * (long) 1000; * * public long extractTimestamp(Monitoring monitoring, long previousTS) {* *return monitoring.getEventTimestamp();**}* *public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {* *return new Watermark(extractedTimestamp + bound);//< should it be - bound ?* *}**}* >>> >>> >>> Used here: >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); final DataStreamSource monitoringDataStreamSource = env.addSource(); DataStream kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringTSWAssigner()); KeyedStream monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", .); final WindowedStream windowStream = monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window >>> >>> >>> TIA, >>> >>
Re: Timestamp Watermark Assigner bpund question
Hi Guowei, Thx for your reply. I am trying to understand the logic behind the Point 1 i.e current Watermark being currMaxTimestamp minus the bound. So, does this mean the Operator processing a task has a current Event time < current Watermark < currMaxTimestamp ??? Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ? Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java. // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; How does it guarantee that watermark never goes backwards ? TIA, Vijay On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma wrote: > Hi, > 1. From doc[1], A Watermark(t) declares that event time has reached time t > in that stream, meaning that there should be no more elements from the > stream with a timestamp t’ <= t (i.e. events with timestamps older or equal > to the watermark). So I think it might be counterintuitive that generating > a watermark, which is bigger than the timestamp of current element. At > least you should minus the bound. > 2. From the definition of watermark I think that watermark is not related > with the length of window. The bound is related to your application. > 3. In your case AssignerWithPunctuatedWatermarks might not be a good > choice. Watermark is not free, you might send too many watermarks. If your > source could generate some "watermark" element I think you could use the > interface. You could choose AssignerWithPeriodicWatermarks. You can find > the example from doc[2]. > > 1. > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks > 2. > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators > Best, > Guowei > > > Vijay Balakrishnan 于2019年4月10日周三 上午7:41写道: > >> Hi, >> I have created a TimestampAssigner as follows. >> I want to use monitoring.getEventTimestamp() with an Event Time >> processing and collected aggregated stats over time window intervals of 5 >> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner >> with a bound ? I want to collect the stats for each eventTimestamp + window >> intervals. My understanding - *the generated watermark which is >> eventTimestamp + bound will collect all the eventTimestamp's which arrive >> within that Watermark inside each eventTimestamp + 5s etc window interval. >> Or does this bound have to be based on the windowInterval i.e >> extractedTimestamp + windowInterval + bound *?? >> >> >>> *public class MonitoringTSWAssigner implements >>> AssignerWithPunctuatedWatermarks {* >>> * private long bound = 5 * (long) 1000; * >>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {* >>> *return monitoring.getEventTimestamp();**}* >>> >>> *public Watermark checkAndGetNextWatermark(Monitoring monitoring, >>> long extractedTimestamp) {* >>> *return new Watermark(extractedTimestamp + bound);//< should >>> it be - bound ?* >>> *}**}* >> >> >> Used here: >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> final DataStreamSource monitoringDataStreamSource = >>> env.addSource(); >>> DataStream kinesisStream = >>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new >>> MonitoringTSWAssigner()); >>> KeyedStream monitoringTupleKeyedStream = >>> kinesisStream.keyBy("deployment", .); >>> final WindowedStream windowStream = >>> >>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window >> >> >> TIA, >> >
Re: Timestamp Watermark Assigner bpund question
Hi, 1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound. 2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application. 3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2]. 1. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks 2. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators Best, Guowei Vijay Balakrishnan 于2019年4月10日周三 上午7:41写道: > Hi, > I have created a TimestampAssigner as follows. > I want to use monitoring.getEventTimestamp() with an Event Time processing > and collected aggregated stats over time window intervals of 5 secs, 5 mins > etc. Is this the right way to create the TimeWaterMarkAssigner with a bound > ? I want to collect the stats for each eventTimestamp + window intervals. > My understanding - *the generated watermark which is eventTimestamp + > bound will collect all the eventTimestamp's which arrive within that > Watermark inside each eventTimestamp + 5s etc window interval. Or does this > bound have to be based on the windowInterval i.e extractedTimestamp + > windowInterval + bound *?? > > >> *public class MonitoringTSWAssigner implements >> AssignerWithPunctuatedWatermarks {* >> * private long bound = 5 * (long) 1000; * >> * public long extractTimestamp(Monitoring monitoring, long previousTS) {* >> *return monitoring.getEventTimestamp();**}* >> >> *public Watermark checkAndGetNextWatermark(Monitoring monitoring, >> long extractedTimestamp) {* >> *return new Watermark(extractedTimestamp + bound);//< should >> it be - bound ?* >> *}**}* > > > Used here: > >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> final DataStreamSource monitoringDataStreamSource = >> env.addSource(); >> DataStream kinesisStream = >> monitoringDataStreamSource.assignTimestampsAndWatermarks(new >> MonitoringTSWAssigner()); >> KeyedStream monitoringTupleKeyedStream = >> kinesisStream.keyBy("deployment", .); >> final WindowedStream windowStream = >> >> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window > > > TIA, >
Timestamp Watermark Assigner bpund question
Hi, I have created a TimestampAssigner as follows. I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - *the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound *?? > *public class MonitoringTSWAssigner implements > AssignerWithPunctuatedWatermarks {* > * private long bound = 5 * (long) 1000; * > * public long extractTimestamp(Monitoring monitoring, long previousTS) {* > *return monitoring.getEventTimestamp();**}* > > *public Watermark checkAndGetNextWatermark(Monitoring monitoring, long > extractedTimestamp) {* > *return new Watermark(extractedTimestamp + bound);//< should > it be - bound ?* > *}**}* Used here: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > final DataStreamSource monitoringDataStreamSource = > env.addSource(); > DataStream kinesisStream = > monitoringDataStreamSource.assignTimestampsAndWatermarks(new > MonitoringTSWAssigner()); > KeyedStream monitoringTupleKeyedStream = > kinesisStream.keyBy("deployment", .); > final WindowedStream windowStream = > monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 > sec time window TIA,