Hi!

Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 customer_id 或
id 的 filter 条件?

有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join
这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。

Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道:

> 我的flink 版本是  flink-1.12.2-bin-scala_2.12
> 我的sql 是
> SELECT o.order_id, o.total, c.country, c.zip
> FROM Orders AS o
>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>     ON o.customer_id = c.id and o.customer_id is not null and  c.id is
> not null ;
> 或者
> SELECT o.order_id, o.total, c.country, c.zip
> FROM Orders AS o
>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>     ON o.customer_id = c.id ;
>
> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Currently the join key in Temporal Table Join can not be empty.
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.immutable.Range.foreach(Range.scala:158)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
> at
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
>
>
>
>
>
>
>
>
>
>
>

回复