是这样的,可以产生,问题从watermarksState里面拿出来的时间戳,会变成是默认值,全局除过这里有设置过,再无任何关于watermark的逻辑。
[image: image.png]

val dataLoadStream = data
  .map(new EventMapFunction(config))
  // Add watermark
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness[EventData](Duration.ofMinutes(1))
      .withTimestampAssigner(new SerializableTimestampAssigner[EventData] {
        override def extractTimestamp(element: EventData, recordTimestamp: Long)
        : Long = element.getEventTimestamp
      })
  )


On Sun, Mar 7, 2021 at 10:38 PM tison <[email protected]> wrote:

> 可以中途产生,走这个接口
>
>
> org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)
>
> 麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况
>
> Best,
> tison.
>
>
> Xavier <[email protected]> 于2021年3月7日周日 下午7:51写道:
>
> >    想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> > function之后,watermark会自动重置为默认值的情况。
> >     谢谢!
> > --
> >
> > Best Regards,
> > *Xavier*
> >
>


-- 

Best Regards,
*Xavier*

回复