Jing Zhang created FLINK-13321:
----------------------------------

             Summary: Join a udf with constant arguments or without argument in 
TableAPI query of Blink Planner does not work now
                 Key: FLINK-13321
                 URL: https://issues.apache.org/jira/browse/FLINK-13321
             Project: Flink
          Issue Type: Task
          Components: Table SQL / API
            Reporter: Jing Zhang


In blink planner, Join a udf with constant arguments or without argument in 
TableAPI query does not work now, for example: error will be thrown if run the 
following two TableAPI query in Blink planner:

{code:java}
leftT.select('c).joinLateral(func0("1", "2"))

// leftT.select('c).joinLateral(func0())
{code}

The following error will be thrown:

{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 

FlinkLogicalSink(name=[5771dc74-8986-4ffa-828f-8ed40602593a], fields=[c, f0])
+- FlinkLogicalCorrelate(correlation=[$cor3], joinType=[inner], 
requiredColumns=[{}])
   :- FlinkLogicalCalc(select=[c])
   :  +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, 15cbb5bf-816b-4319-9be8-6c648c868843]])
   +- FlinkLogicalCorrelate(correlation=[$cor4], joinType=[inner], 
requiredColumns=[{}])
      :- FlinkLogicalValues(tuples=[[{  }]])
      +- 
FlinkLogicalTableFunctionScan(invocation=[org$apache$flink$table$util$VarArgsFunc0$2ad590150fcbadcd9e420797d27a5eb1(_UTF-16LE'1',
 _UTF-16LE'2')], rowType=[RecordType(VARCHAR(2147483647) f0)], 
elementType=[class [Ljava.lang.Object;])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

        at 
org.apache.flink.table.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
        at 
org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:63)
        at 
org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
...
....
{code}

The root cause is the `FlinkLogicalTableFunctionScan`.CONVERTER translates a 
`TableFunctionScan` to a `Correlate`. Which will translate the original 
`RelNode` tree to a `RelNode` with two Cascaded ·Correlate` (could be found in 
the above thrown message), which could not translate to Physical `RelNode`.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to