Hi Dylan, I have reproduced your issue based on your code, Currently Flink does not support such nested correlate pattern query. I have created a issue to track this [1]. Thanks for your reporting and help.
[1] https://issues.apache.org/jira/browse/FLINK-20255 Best, Godfrey Dylan Forciea <dy...@oseberg.io> 于2020年11月19日周四 下午12:10写道: > Godfrey, > > > > I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack > trace running exactly this code: > > > > import org.apache.flink.api.scala._ > > import org.apache.flink.core.fs.FileSystem.WriteMode > > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > > import org.apache.flink.table.api._ > > import org.apache.flink.table.api.bridge.scala._ > > import org.apache.flink.types.Row > > import org.apache.flink.table.annotation.FunctionHint > > import org.apache.flink.table.annotation.DataTypeHint > > import org.apache.flink.table.functions.TableFunction > > > > > > @FunctionHint(output = new DataTypeHint("ROW<val STRING>")) > > class SplitStringToRows extends TableFunction[Row] { > > def eval(str: String, separator: String = ";"): Unit = { > > if (str != null) { > > str.split(separator).foreach(s => collect(Row.of(s.trim()))) > > } > > } > > } > > object Job { > > > > def main(args: Array[String]): Unit = { > > val settings = EnvironmentSettings > .newInstance().useBlinkPlanner().inStreamingMode().build() > > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > > val streamTableEnv = StreamTableEnvironment.create(streamEnv, > settings) > > > > streamTableEnv.createTemporarySystemFunction( > > "SplitStringToRows", > > classOf[SplitStringToRows] > > ) // Class defined in previous email > > > > streamTableEnv.executeSql( > > """ > > CREATE TABLE table1 ( > > id_source BIGINT PRIMARY KEY, > > attr1_source STRING, > > attr2 STRING > > ) WITH ( > > 'connector' = 'jdbc', > > 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true', > > 'table-name' = '<table>', > > 'username' = '<username>', > > 'password' = '<password>', > > 'scan.fetch-size' = '500', > > 'scan.auto-commit' = 'false') > > """) > > > > streamTableEnv.executeSql( > > """ > > CREATE TABLE table2 ( > > attr1_source STRING, > > attr2 STRING, > > attr3 DECIMAL, > > attr4 DATE > > ) WITH ( > > 'connector' = 'jdbc', > > 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true', > > 'table-name' = '<table>', > > 'username' = '<username>', > > 'password' = '<password>', > > 'scan.fetch-size' = '500', > > 'scan.auto-commit' = 'false') > > """) > > > > val q1 = streamTableEnv.sqlQuery(""" > > SELECT > > id_source AS id, > > attr1_source AS attr1, > > attr2 > > FROM table1 > > """) > > streamTableEnv.createTemporaryView("view1", q1) > > > > val q2 = streamTableEnv.sqlQuery( > > """ > > SELECT > > a.attr1 AS attr1, > > attr2, > > attr3, > > attr4 > > FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, > ';')) AS a(attr1) > > """) > > streamTableEnv.createTemporaryView("view2", q2) > > > > val q3 = streamTableEnv.sqlQuery(""" > > SELECT > > w.attr1, > > p.attr3 > > FROM view1 w > > LEFT JOIN LATERAL ( > > SELECT > > attr1, > > attr3 > > FROM ( > > SELECT > > attr1, > > attr3, > > ROW_NUMBER() OVER ( > > PARTITION BY attr1 > > ORDER BY > > attr4 DESC NULLS LAST, > > w.attr2 = attr2 DESC NULLS LAST > > ) AS row_num > > FROM view2) > > WHERE row_num = 1) p > > ON (w.attr1 = p.attr1) > > """) > > streamTableEnv.createTemporaryView("view3", q3) > > > > val view3 = streamTableEnv.from("view3") > > > > view3 > > .toRetractStream[Row] > > .writeAsCsv("./view3.csv", WriteMode.OVERWRITE) > > .setParallelism(1) > > > > streamEnv.execute() > > } > > } > > > > Thanks, > > Dylan Forciea > > > > *From: *godfrey he <godfre...@gmail.com> > *Date: *Wednesday, November 18, 2020 at 8:29 PM > *To: *Dylan Forciea <dy...@oseberg.io> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Lateral join not finding correlate variable > > > > Dylan, > > > > Thanks for you feedback, if the planner encounters > > "unexpected correlate variable $cor2 in the plan" exception, > > There's a high probability that FlinkDecorrelateProgram has some bugs > > or the query pattern is not supported now. I try to use JDBC Connector as > the input tables, > > but I still don't reproduce the exception. Could you provide your full > code, including ddl, query, etc. > > Thanks so much. > > > > Best, > > Godfrey > > > > > > > > Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 下午10:09写道: > > Godfrey, > > > > I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and > am still having the same issue. Note that I am using the JDBC Connector for > the input tables, and table1 and table2 are actually created from queries > on those connector tables and not directly. > > > > Since you indicated what I did should work, I played around a bit more, > and determined it’s something inside of the table2 query that is triggering > the error. The id field there is generated by a table function. Removing > that piece made the plan start working. Table 2 is formulated as follows: > > > > SELECT > > T.id, > > attr2, > attr3, > > attr4 > > FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id) > > > > Where SplitStringToRows is defined as: > > > > @FunctionHint(output = new DataTypeHint("ROW<val STRING>")) > > class SplitStringToRows extends TableFunction[Row] { > > > > def eval(str: String, separator: String = ";"): Unit = { > > if (str != null) { > > str.split(separator).foreach(s => collect(Row.of(s.trim()))) > > } > > } > > } > > > > Removing the lateral table bit in that first table made the original query > plan work correctly. > > > > I greatly appreciate your assistance! > > > > Regards, > > Dylan Forciea > > > > *From: *godfrey he <godfre...@gmail.com> > *Date: *Wednesday, November 18, 2020 at 7:33 AM > *To: *Dylan Forciea <dy...@oseberg.io> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Lateral join not finding correlate variable > > > > Hi Dylan, > > > > Could you provide which Flink version you find out the problem with? > > I test the above query on master, and I get the plan, no errors occur. > > Here is my test case: > > @Test > def testLateralJoin(): Unit = { > *util*.addTableSource[(String, String, String, String, String)]("table1", > 'id, 'attr1, 'attr2, 'attr3, 'attr4) > *util*.addTableSource[(String, String, String, String, String)]("table2", > 'id, 'attr1, 'attr2, 'attr3, 'attr4) > val query = > """ > |SELECT > | t1.id, > | t1.attr1, > | t2.attr2 > |FROM table1 t1 > |LEFT JOIN LATERAL ( > | SELECT > | id, > | attr2 > | FROM ( > | SELECT > | id, > | attr2, > | ROW_NUMBER() OVER ( > | PARTITION BY id > | ORDER BY > | attr3 DESC, > | t1.attr4 = attr4 DESC > | ) AS row_num > | FROM table2) > | WHERE row_num = 1) t2 > |ON t1.id = t2.id > |""".stripMargin > *util*.verifyPlan(query) > } > > Best, > > Godfrey > > > > Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 上午7:44写道: > > This may be due to not understanding lateral joins in Flink – perhaps you > can only do so on temporal variables – but I figured I’d ask since the > error message isn’t intuitive. > > > > I am trying to do a combination of a lateral join and a top N query. Part > of my ordering is based upon whether the a value in the left side of the > query matches up. I’m trying to do this in the general form of: > > > > SELECT > > t1.id, > > t1.attr1, > > t2.attr2 > > FROM table1 t1 > > LEFT JOIN LATERAL ( > > SELECT > > id, > > attr2 > > FROM ( > > SELECT > > id, > > attr2, > > ROW_NUMBER() OVER ( > > PARTITION BY id > ORDER BY > > attr3 DESC, > > t1.attr4 = attr4 DESC > > ) AS row_num > > FROM table2 > > WHERE row_num = 1) t2 > > ON (t1.id = t2.id) > > > > I am getting an error that looks like: > > > > Exception in thread "main" org.apache.flink.table.api.TableException: > unexpected correlate variable $cor2 in the plan > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > > at scala.collection.Iterator.foreach(Iterator.scala:943) > > at scala.collection.Iterator.foreach$(Iterator.scala:943) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) > > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) > > at > org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97) > > at io.oseberg.flink.well.ok.Job$.main(Job.scala:57) > > at io.oseberg.flink.well.ok.Job.main(Job.scala) > > > > The only other thing I can think of doing is creating a Table Aggregate > function to pull this off. But, I wanted to check to make sure I wasn’t > doing something wrong in the above first, or if there is something I’m not > thinking of doing. > > > > Regards, > > Dylan Forciea > >