Is it possible to join a table with two other temporal tables to achieve this? I'm trying but getting a NullPointerException deep inside Flink's Calcite code. I've attached some sample code that demonstrates the problem. Is my SQL incorrect/invalid (in which case Flink ideally should detect the problem and provide a better error message), or is the SQL correct and this a bug/limitation in Flink? If it's the latter, how do I achieve a similar result?
The SQL I'm trying to run: SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price FROM Orders AS o, LATERAL TABLE (FxRateLookup(o.rowtime)) AS f, LATERAL TABLE (PriceLookup(o.rowtime)) AS p WHERE o.currency = f.currency AND o.productId = p.productId The exception I get: Exception in thread "main" java.lang.NullPointerExceptionat org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129) at org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228) at org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212) at org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187) at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
at test.PojoTest.run(PojoTest.java:96) at test.PojoTest.main(PojoTest.java:23)
FlinkTest.java
Description: Binary data