Godfrey,
I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am
still having the same issue. Note that I am using the JDBC Connector for the
input tables, and table1 and table2 are actually created from queries on those
connector tables and not directly.
Since you indicated what I did should work, I played around a bit more, and
determined it’s something inside of the table2 query that is triggering the
error. The id field there is generated by a table function. Removing that piece
made the plan start working. Table 2 is formulated as follows:
SELECT
T.id,
attr2,
attr3,
attr4
FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
Where SplitStringToRows is defined as:
@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
str.split(separator).foreach(s => collect(Row.of(s.trim())))
}
}
}
Removing the lateral table bit in that first table made the original query plan
work correctly.
I greatly appreciate your assistance!
Regards,
Dylan Forciea
From: godfrey he <[email protected]>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <[email protected]>
Cc: "[email protected]" <[email protected]>
Subject: Re: Lateral join not finding correlate variable
Hi Dylan,
Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:
@Test
def testLateralJoin(): Unit = {
util.addTableSource[(String, String, String, String, String)]("table1", 'id,
'attr1, 'attr2, 'attr3, 'attr4)
util.addTableSource[(String, String, String, String, String)]("table2", 'id,
'attr1, 'attr2, 'attr3, 'attr4)
val query =
"""
|SELECT
| t1.id<http://t1.id>,
| t1.attr1,
| t2.attr2
|FROM table1 t1
|LEFT JOIN LATERAL (
| SELECT
| id,
| attr2
| FROM (
| SELECT
| id,
| attr2,
| ROW_NUMBER() OVER (
| PARTITION BY id
| ORDER BY
| attr3 DESC,
| t1.attr4 = attr4 DESC
| ) AS row_num
| FROM table2)
| WHERE row_num = 1) t2
|ON t1.id<http://t1.id> = t2.id<http://t2.id>
|""".stripMargin
util.verifyPlan(query)
}
Best,
Godfrey
Dylan Forciea <[email protected]<mailto:[email protected]>> 于2020年11月18日周三
上午7:44写道:
This may be due to not understanding lateral joins in Flink – perhaps you can
only do so on temporal variables – but I figured I’d ask since the error
message isn’t intuitive.
I am trying to do a combination of a lateral join and a top N query. Part of my
ordering is based upon whether the a value in the left side of the query
matches up. I’m trying to do this in the general form of:
SELECT
t1.id<http://t1.id>,
t1.attr1,
t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
SELECT
id,
attr2
FROM (
SELECT
id,
attr2,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
attr3 DESC,
t1.attr4 = attr4 DESC
) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id<http://t1.id> = t2.id<http://t2.id>)
I am getting an error that looks like:
Exception in thread "main" org.apache.flink.table.api.TableException:
unexpected correlate variable $cor2 in the plan
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at
org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
at io.oseberg.flink.well.ok.Job.main(Job.scala)
The only other thing I can think of doing is creating a Table Aggregate
function to pull this off. But, I wanted to check to make sure I wasn’t doing
something wrong in the above first, or if there is something I’m not thinking
of doing.
Regards,
Dylan Forciea