Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down
的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的
condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join
的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。()

Chai Kelun <chaike...@hotmail.com> 于2023年7月3日周一 17:58写道:

> 有一张 kafka 流表 logClient(id int, name string, price double),一张实现了
> SupportsFilterPushDown 的 customConnector 维表 product(id int, name string,
> value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
> 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join
> 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行)
> SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE
> MyUDF(B.value, A.price) < xxx;
> 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在
> logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持
> SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch
> case),后续社区是否会考虑支持更多的 distributionType?
>
> 非常感谢!

回复