watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?
比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?


Best,
Jark

On Tue, 7 Jul 2020 at 17:20, lgs <[email protected]> wrote:

> source是kafka,有一个rowtime定义:
>
>             .field("rowtime", DataTypes.TIMESTAMP(0))
>             .rowtime(Rowtime()
>                 .timestamps_from_field("actionTime")
>                 .watermarks_periodic_bounded(60000)
>             )
>
> 有两个sink,第一个sink是直接把kafa的数据保存到postgres。
> 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
>     st_env.scan("source") \
>          .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
> \
>          .group_by("hourlywindow") \
>          .select("udf(...)")
>          ...
>
>
> 现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。
>
> 有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复