是这样的,可以产生,问题从watermarksState里面拿出来的时间戳,会变成是默认值,全局除过这里有设置过,再无任何关于watermark的逻辑。
[image: image.png]
val dataLoadStream = data
.map(new EventMapFunction(config))
// Add watermark
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[EventData](Duration.ofMinutes(1))
.withTimestampAssigner(new SerializableTimestampAssigner[EventData] {
override def extractTimestamp(element: EventData, recordTimestamp: Long)
: Long = element.getEventTimestamp
})
)
On Sun, Mar 7, 2021 at 10:38 PM tison <[email protected]> wrote:
> 可以中途产生,走这个接口
>
>
> org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)
>
> 麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况
>
> Best,
> tison.
>
>
> Xavier <[email protected]> 于2021年3月7日周日 下午7:51写道:
>
> > 想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> > function之后,watermark会自动重置为默认值的情况。
> > 谢谢!
> > --
> >
> > Best Regards,
> > *Xavier*
> >
>
--
Best Regards,
*Xavier*