各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?

 

周期新创建watermark  方法如下:

            .assignAscendingTimestamps(element =>
sdf.parse(element.createTime).getTime)

            .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))

 

 

生成Timestamp的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :

 


@Override
public void processElement(StreamRecord<T> element) throws Exception {
   final long newTimestamp =
userFunction.extractTimestamp(element.getValue(),
         element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));
}

 

 

生成watermark的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :


@Override
public void onProcessingTime(long timestamp) throws Exception {
   // 从这里可以看到,每200ms 打印一次
   System.out.println("timestamp : " + timestamp + ", system.current : " +
System.currentTimeMillis());
   // register next timer
   Watermark newWatermark = userFunction.getCurrentWatermark();
   if (newWatermark != null && newWatermark.getTimestamp() >
currentWatermark) {
      currentWatermark = newWatermark.getTimestamp();
      // emit watermark
      output.emitWatermark(newWatermark);
   }

   long now = getProcessingTimeService().getCurrentProcessingTime();
   getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

 

 

 

感谢各位大佬

回复