这样再试试?
tmp_table = st_env.scan("source") \
.where("action === 'Insert'") \
.window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
.group_by("hourlywindow") \
.select("action.max as action1, conv_string(eventTime.collect) as
etlist, hourlywindow.start as time1") \
.select("action1 as action, hbf_thres(etlist) as eventtime, time1 as
actiontime")
ds = st_env._j_tenv.toAppendStream(tmp_table._j_table,
tmp_table._j_table.getSchema().toRowType())
table = Table(st_env._j_tenv.fromDataStream(ds, "action, eventtime,
actiontime"))
table.filter("eventtime.isNotNull").insert_into("alarm_ad")
> 在 2020年7月10日,下午2:44,lgs <[email protected]> 写道:
>
> 谢谢建议。
> 我照着代码试了一下,发现还是一样的结果。
> udf还是会被调用两次
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/