这样再试试?

tmp_table = st_env.scan("source") \
        .where("action === 'Insert'") \
        .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
        .group_by("hourlywindow") \
        .select("action.max as action1, conv_string(eventTime.collect) as 
etlist, hourlywindow.start as time1") \
        .select("action1 as action, hbf_thres(etlist) as eventtime, time1 as 
actiontime")

ds = st_env._j_tenv.toAppendStream(tmp_table._j_table, 
tmp_table._j_table.getSchema().toRowType())
table = Table(st_env._j_tenv.fromDataStream(ds, "action, eventtime, 
actiontime"))
table.filter("eventtime.isNotNull").insert_into("alarm_ad")

> 在 2020年7月10日,下午2:44,lgs <[email protected]> 写道:
> 
> 谢谢建议。
> 我照着代码试了一下,发现还是一样的结果。
> udf还是会被调用两次
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

回复