[ 
https://issues.apache.org/jira/browse/FLINK-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-16108:
-------------------------------

    Assignee: Jark Wu

> StreamSQLExample is failed if running in blink planner
> ------------------------------------------------------
>
>                 Key: FLINK-16108
>                 URL: https://issues.apache.org/jira/browse/FLINK-16108
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>            Priority: Critical
>             Fix For: 1.10.1
>
>
> {{StreamSQLExample}} in flink-example will fail if the specified planner is 
> blink planner. Exception is as following:
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Field types of query result and registered TableSink  do not match.
> Query schema: [user: BIGINT, product: STRING, amount: INT]
> Sink schema: [amount: INT, product: STRING, user: BIGINT]
>       at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>       at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361)
>       at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269)
>       at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260)
>       at 
> org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90)
> Process finished with exit code 1
> {code}
> That's because blink planner will also validate the sink schema even if it is 
> come from {{toAppendStream()}}. However, the 
> {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from 
> query schema when the requested type is POJO [1], because fields order of 
> POJO is not deterministic.
> [1]: 
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to