Hi! Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 customer_id 或 id 的 filter 条件?
有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join 这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。 Wayne <[email protected]> 于2021年9月1日周三 下午6:27写道: > 我的flink 版本是 flink-1.12.2-bin-scala_2.12 > 我的sql 是 > SELECT o.order_id, o.total, c.country, c.zip > FROM Orders AS o > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > ON o.customer_id = c.id and o.customer_id is not null and c.id is > not null ; > 或者 > SELECT o.order_id, o.total, c.country, c.zip > FROM Orders AS o > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > ON o.customer_id = c.id ; > > 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢 > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Currently the join key in Temporal Table Join can not be empty. > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.immutable.Range.foreach(Range.scala:158) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) > at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) > > > > > > > > > > >
