好的,针对你这个case,这个是个已知问题:https://issues.apache.org/jira/browse/FLINK-15973 
<https://issues.apache.org/jira/browse/FLINK-15973>,暂时还没有修复。


你可以这样改写一下,应该可以绕过去这个问题:

 table = st_env.scan("source") \
        .where("action === 'Insert'") \
        .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
        .group_by("hourlywindow") \
        .select("action.max as action1, conv_string(eventTime.collect) as 
etlist, hourlywindow.start as time1") \
        .select("action1 as action, hbf_thres(etlist) as eventtime, time1as 
actiontime")

st_env.create_temporary_view("tmp", table)
st_env.scan("tmp").filter("eventtime.isNotNull").insert_into("alarm_ad")


> 在 2020年7月10日,上午10:08,lgs <[email protected]> 写道:
> 
> 谢谢提示。
> 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull:
> 
> 
> 
>    st_env.scan("source") \
>         .where("action === 'Insert'") \
> 
> .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
>         .group_by("hourlywindow") \
>         .select("action.max as action1, conv_string(eventTime.collect) as
> etlist, hourlywindow.start as time1") \
>         .select("action1 as action, hbf_thres(etlist) as eventtime, time1
> as actiontime") \
> *         .filter("eventtime.isNotNull") \
> *         .insert_into("alarm_ad")
> 
> 
> LegacySink(name=[`default_catalog`.`default_database`.`alarm_ad`],
> fields=[action, eventtime, actiontime])
> +- Calc(select=[EXPR$0 AS action, f0 AS eventtime, EXPR$2 AS actiontime])
> *   +- PythonCalc(select=[EXPR$0, EXPR$2, simple_udf(f0) AS f0])
>      +- Calc(select=[EXPR$0, EXPR$2, UDFLength(EXPR$1) AS f0], where=[IS
> NOT NULL(f0)])
> *         +- PythonCalc(select=[EXPR$0, EXPR$1, EXPR$2, simple_udf(f0) AS
> f0])
>            +- Calc(select=[EXPR$0, EXPR$1, EXPR$2, UDFLength(EXPR$1) AS
> f0])
>               +-
> GroupWindowAggregate(window=[TumblingGroupWindow('hourlywindow, actionTime,
> 3600000)], properties=[EXPR$2], select=[MAX(action) AS EXPR$0,
> COLLECT(eventTime) AS EXPR$1, start('hourlywindow) AS EXPR$2])
>                  +- Exchange(distribution=[single])
>                     +- Calc(select=[recordId, action, originalState,
> newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime,
> duration, acked, pmdId, pmdTime, actionTime], where=[=(action,
> _UTF-16LE'Insert')])
>                        +- Reused(reference_id=[1])
> 
> 我这里是想过滤python udf的返回,如果返回是空,我就不要sink。是我的sql写错了吗?
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

回复