Aitozi created FLINK-32320:
------------------------------

             Summary: Same correlate can not be reused due to the different 
correlationId
                 Key: FLINK-32320
                 URL: https://issues.apache.org/jira/browse/FLINK-32320
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Aitozi


As describe in SubplanReuserTest


{code:java}
  @Test
  def testSubplanReuseOnCorrelate(): Unit = {
    util.addFunction("str_split", new StringSplit())
    val sqlQuery =
      """
        |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) 
AS T(v))
        |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
      """.stripMargin
    // TODO the sub-plan of Correlate should be reused,
    // however the digests of Correlates are different
    util.verifyExecPlan(sqlQuery)
  }
{code}

This will produce the plan 


{code:java}
HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, 
c0, f00], build=[right])
:- Exchange(distribution=[hash[f0]])
:  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
:     +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f0]])
   +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
      +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}

The Correlate node can not be reused due to the different correlation id.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to