This may be due to not understanding lateral joins in Flink – perhaps you can
only do so on temporal variables – but I figured I’d ask since the error
message isn’t intuitive.
I am trying to do a combination of a lateral join and a top N query. Part of my
ordering is based upon whether the a value in the left side of the query
matches up. I’m trying to do this in the general form of:
SELECT
t1.id,
t1.attr1,
t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
SELECT
id,
attr2
FROM (
SELECT
id,
attr2,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
attr3 DESC,
t1.attr4 = attr4 DESC
) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id = t2.id)
I am getting an error that looks like:
Exception in thread "main" org.apache.flink.table.api.TableException:
unexpected correlate variable $cor2 in the plan
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at
org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
at io.oseberg.flink.well.ok.Job.main(Job.scala)
The only other thing I can think of doing is creating a Table Aggregate
function to pull this off. But, I wanted to check to make sure I wasn’t doing
something wrong in the above first, or if there is something I’m not thinking
of doing.
Regards,
Dylan Forciea