[jira] [Commented] (FLINK-13653) ResultStore should avoid using RowTypeInfo when creating a result

2019-08-13 Thread Rui Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16906816#comment-16906816
 ] 

Rui Li commented on FLINK-13653:


Just created a PoC pull request to get some feedbacks.
It seems we don't have to worry about the type system inconsistency between 
source and sink, because we create the sink using the table schema of the 
source table. Therefore I think we're good as long as we avoid the conversion 
between {{DataType}} and {{TypeInformation}} in the sinks.
[~lzljs3620320] [~xuefuz] [~phoenixjiangnan] could you please share your 
opinions about this issue? Thanks.

> ResultStore should avoid using RowTypeInfo when creating a result
> -
>
> Key: FLINK-13653
> URL: https://issues.apache.org/jira/browse/FLINK-13653
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Creating a RowTypeInfo from a TableSchema can lose type parameters. As a 
> result, querying a Hive table with decimal column from SQL CLI will hit the 
> following exception:
> {noformat}
> Caused by: org.apache.flink.table.api.ValidationException: Field types of 
> query result and registered TableSink [default_catalog, default_database, 
> default: select * from foo] do not match.
> Query result schema: [x: BigDecimal]
> TableSink schema:[x: BigDecimal]
> at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:146)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
> at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428)
> at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeQueryInternal$10(LocalExecutor.java:477)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:216)
> at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:475)
> ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13653) ResultStore should avoid using RowTypeInfo when creating a result

2019-08-08 Thread Rui Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903527#comment-16903527
 ] 

Rui Li commented on FLINK-13653:


I thought we should just use the new type system for {{CollectBatchTableSink}} 
and {{CollectStreamTableSink}}. However, according to the JavaDoc of 
{{TableSink.getOutputType()}} and {{TableSource.getReturnType()}}, user should 
"_use either the old or the new type system consistently to avoid unintended 
behavior_". And if the table sinks created in SQL client need to support table 
sources that may use new or old type systems, I'm not sure whether we have to 
create different sinks for new and old type systems respectively?

> ResultStore should avoid using RowTypeInfo when creating a result
> -
>
> Key: FLINK-13653
> URL: https://issues.apache.org/jira/browse/FLINK-13653
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Rui Li
>Priority: Major
>
> Creating a RowTypeInfo from a TableSchema can lose type parameters. As a 
> result, querying a Hive table with decimal column from SQL CLI will hit the 
> following exception:
> {noformat}
> Caused by: org.apache.flink.table.api.ValidationException: Field types of 
> query result and registered TableSink [default_catalog, default_database, 
> default: select * from foo] do not match.
> Query result schema: [x: BigDecimal]
> TableSink schema:[x: BigDecimal]
> at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:146)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
> at 
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428)
> at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeQueryInternal$10(LocalExecutor.java:477)
> at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:216)
> at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:475)
> ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)