[jira] [Commented] (FLINK-13653) ResultStore should avoid using RowTypeInfo when creating a result
[ https://issues.apache.org/jira/browse/FLINK-13653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16906816#comment-16906816 ] Rui Li commented on FLINK-13653: Just created a PoC pull request to get some feedbacks. It seems we don't have to worry about the type system inconsistency between source and sink, because we create the sink using the table schema of the source table. Therefore I think we're good as long as we avoid the conversion between {{DataType}} and {{TypeInformation}} in the sinks. [~lzljs3620320] [~xuefuz] [~phoenixjiangnan] could you please share your opinions about this issue? Thanks. > ResultStore should avoid using RowTypeInfo when creating a result > - > > Key: FLINK-13653 > URL: https://issues.apache.org/jira/browse/FLINK-13653 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Reporter: Rui Li >Assignee: Rui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Creating a RowTypeInfo from a TableSchema can lose type parameters. As a > result, querying a Hive table with decimal column from SQL CLI will hit the > following exception: > {noformat} > Caused by: org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink [default_catalog, default_database, > default: select * from foo] do not match. > Query result schema: [x: BigDecimal] > TableSink schema:[x: BigDecimal] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:146) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeQueryInternal$10(LocalExecutor.java:477) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:216) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:475) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13653) ResultStore should avoid using RowTypeInfo when creating a result
[ https://issues.apache.org/jira/browse/FLINK-13653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903527#comment-16903527 ] Rui Li commented on FLINK-13653: I thought we should just use the new type system for {{CollectBatchTableSink}} and {{CollectStreamTableSink}}. However, according to the JavaDoc of {{TableSink.getOutputType()}} and {{TableSource.getReturnType()}}, user should "_use either the old or the new type system consistently to avoid unintended behavior_". And if the table sinks created in SQL client need to support table sources that may use new or old type systems, I'm not sure whether we have to create different sinks for new and old type systems respectively? > ResultStore should avoid using RowTypeInfo when creating a result > - > > Key: FLINK-13653 > URL: https://issues.apache.org/jira/browse/FLINK-13653 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Reporter: Rui Li >Priority: Major > > Creating a RowTypeInfo from a TableSchema can lose type parameters. As a > result, querying a Hive table with decimal column from SQL CLI will hit the > following exception: > {noformat} > Caused by: org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink [default_catalog, default_database, > default: select * from foo] do not match. > Query result schema: [x: BigDecimal] > TableSink schema:[x: BigDecimal] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:146) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeQueryInternal$10(LocalExecutor.java:477) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:216) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:475) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.14#76016)