hi venn,
基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:
/**
* Sets the time characteristic for all streams create from this
environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime
this will set a default
* watermark update interval of 200 ms. If this is not applicable
for your application
* you should change it using {@link
ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
On Tue, Sep 3, 2019 at 2:39 PM venn <[email protected]> wrote:
> 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
> watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
> 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?
>
>
>
> 周期新创建watermark 方法如下:
>
> .assignAscendingTimestamps(element =>
> sdf.parse(element.createTime).getTime)
>
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))
>
>
>
>
>
> 生成Timestamp的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
>
>
> @Override
> public void processElement(StreamRecord<T> element) throws Exception {
> final long newTimestamp =
> userFunction.extractTimestamp(element.getValue(),
> element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
>
> output.collect(element.replace(element.getValue(), newTimestamp));
> }
>
>
>
>
>
> 生成watermark的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
> @Override
> public void onProcessingTime(long timestamp) throws Exception {
> // 从这里可以看到,每200ms 打印一次
> System.out.println("timestamp : " + timestamp + ", system.current : " +
> System.currentTimeMillis());
> // register next timer
> Watermark newWatermark = userFunction.getCurrentWatermark();
> if (newWatermark != null && newWatermark.getTimestamp() >
> currentWatermark) {
> currentWatermark = newWatermark.getTimestamp();
> // emit watermark
> output.emitWatermark(newWatermark);
> }
>
> long now = getProcessingTimeService().getCurrentProcessingTime();
> getProcessingTimeService().registerTimer(now + watermarkInterval, this);
> }
>
>
>
>
>
>
>
> 感谢各位大佬
>
>