Hi 社区。
版本 : Flink 1.12.1
在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
e.g.
INSERT INTO table_a
SELECT
update_time,
    MD5(p_key) AS id,
    p_key
FROM
(
    SELECT
        LOCALTIMESTAMP AS update_time ,
        findkeyudf(p_name) AS p_key
    FROM table_b
) T
WHERE COALESCE(p_key, '')<> ''
;

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table_b]], fields=[p_name, xxx, ...])

Stage 2 : Operator
content : Calc(select=[CAST(()) AS update_date,
CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
ship_strategy : FORWARD

Stage 3 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[update_date, comp_name, p_key])
ship_strategy : FORWARD

查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。

现在有2个问题:
1. udf 调用不会被优化成一次,结果复用吗?
2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
)是执行了2次的。
3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?

回复