Dawid Wysakowicz created FLINK-34910:
----------------------------------------

             Summary: Can not plan window join without projections
                 Key: FLINK-34910
                 URL: https://issues.apache.org/jira/browse/FLINK-34910
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.19.0
            Reporter: Dawid Wysakowicz
            Assignee: Dawid Wysakowicz
             Fix For: 1.20.0


When running:
{code}
  @Test
  def testWindowJoinWithoutProjections(): Unit = {
    val sql =
      """
        |SELECT *
        |FROM
        |  TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE)) AS L
        |JOIN
        |  TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE)) AS R
        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND 
L.a = R.a
      """.stripMargin
    util.verifyRelPlan(sql)
  }
{code}

It fails with:
{code}
FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, window_start, window_end, window_time, a0, b0, c0, rowtime0, 
PROCTIME_MATERIALIZE(proctime0) AS proctime0, window_start0, window_end0, 
window_time0])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{}])
   :- FlinkLogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
   :  +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
   :     +- FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
   :        +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
   +- 
FlinkLogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR(CAST($3):TIMESTAMP(3)),
 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) 
b, BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
      +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
         +- FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])

Failed to get time attribute index from DESCRIPTOR(CAST($3):TIMESTAMP(3)). This 
is a bug, please file a JIRA issue.
Please check the documentation for the set of currently supported SQL features.
{code}

In prior versions this had another problem of ambiguous {{rowtime}} column, but 
this has been fixed by [FLINK-32648]. In versions < 1.19 WindowTableFunctions 
were incorrectly scoped, because they were not extending from Calcite's 
SqlWindowTableFunction and the scoping implemented in 
SqlValidatorImpl#convertFrom was incorrect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to