[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090453#comment-17090453
 ] 

Jark Wu commented on FLINK-17313:
-

[~dwysakowicz] Thanks for taking case of this PR. I'm also fine with the 
proposed PR. 

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-23 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090326#comment-17090326
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

What is your opinion [~jark]? I do agree with what you said in principle. The 
relaxing proposed in the PR does not stretch the assumptions too much and 
mainly aims to improve support the old types, which truth be told did not care 
about any precision. Still it verifies that none of the mismatches require a 
cast. If you are ok with it I will take care of the PR.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089590#comment-17089590
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

[~twalthr] raised a good point that this method will not be used for the new 
sources. Therefore I think we can relax it here for now to make the old <> new 
type system integration a bit smoother.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089565#comment-17089565
 ] 

Timo Walther commented on FLINK-17313:
--

I'm also fine with the proposed PR. The bug around {{supportsAvoidingCast}} 
should definitely be fixed. And if it helps for legacy connectors, we can also 
be more lenient around decimal. However, we should not start relaxing new 
behavior in FLIP-95. Changing the precision and scale of a decimal type will 
result in completely different data. No user wants that and it is also very 
hard to debug. Luckily with FLIP-95, the sink must completely implement the 
schema of the catalog table. There is no possibility of having a mismatch 
because there is no `getProducedDataType()`.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089530#comment-17089530
 ] 

Terry Wang commented on FLINK-17313:


supportsAvoidingCast method behavior looks right to me, relaxed check will 
improve user experience of old-style connector and don't affect the correctness.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089491#comment-17089491
 ] 

Jark Wu commented on FLINK-17313:
-

[~dwysakowicz], Yes, my point is that the change from {{areTypeCompatible}} to 
{{supportsAvoidingCast}} changes the behavior of this class, and is that a 
mistake? 

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089489#comment-17089489
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

[~jark] What you are describing was the original purpose of this class. 
Introduction of the {{supportsAvoidingCast}} changed it a bit. Since the change 
it let some of mismatches pass through.

I am not sure how we should proceed further with it.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089484#comment-17089484
 ] 

Jark Wu commented on FLINK-17313:
-

I still think the root cause is FLINK-15469. And the proposed way in the PR is 
just a workaround. 

The purpose of {{TypeMappingUtils.checkPhysicalLogicalTypeCompatible}} should 
check the logical and physical types are compatible, i.e. types are equal but 
ignore field names. It is used to guarantee the sink connector complies with 
the DDL. If the connector wants to support precision, then it has to use new 
type system, not the legacy TypeInformation which loses the precision. 

Any precision mismatch in the logical and physical types should fail (except 
legacy decimal), so I think {{supportsAvoidingCast}} here is a mistake and 
should use something more strict. The {{supportsAvoidingCast}} should be used 
in checks for query schema to sink schema. 

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089472#comment-17089472
 ] 

Wenlong Lyu commented on FLINK-17313:
-

[~dwysakowicz] Regarding LEGACY DECIMAL, I think it is a special case good to 
support: the physical presentation  of LAGECY DECIMAL is BigDecimal, can 
support any precision and scale, so allow such conversion will not break 
anything actually and the final precision and scale of the output BigDecimal 
still limited by logical data type so no data with error precision will be 
generated. 

What's more, with such support we can easily fill up the support of decimal in 
all kinds of sink with old interface. 

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089411#comment-17089411
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

I must admit I have not fully understood the change in the beginning. I was not 
aware of introduction of the {{supportsAvoidingCast}} in that method before. 
Having understood that now I think it's fine to add the bidirectional check.

I am still skeptical about allowing any precision for {{DECIMAL}} type for 
legacy decimal type. Such conversions might require a cast. 

Moreover 

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089382#comment-17089382
 ] 

Wenlong Lyu commented on FLINK-17313:
-

[~dwysakowicz] I think it is not necessary that the schema of logical schema 
and physical schema should be matched exactly:

Currently, we allow a column in source : logical type varchar(10), while 
pyshical type is varchar(5), see `CastAvoidanceChecker` used in the compatible 
check. 
The requirement on source is that: we need to be able convert a physical record 
of source to internal record according to physicalDataType and LogicalDataType. 

On sinks, the requirements should be reversed: we need to be able convert an 
internal record to a physical record for sink: so  we can allow a column of 
sink whose Logical type is varchar(5) but physical type is varchar(10).

On validation: we has an validation to make sure that the schema of source 
query of a sink match the logical type, the validation between logical type and 
physical can be much more loose I think.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089371#comment-17089371
 ] 

Terry Wang commented on FLINK-17313:


We can not forbid using old connector in ddl, and the old type style TableSink 
can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make 
sense to pass the check logic that logical type is varchar(10)/decimal(22,2)

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089370#comment-17089370
 ] 

Terry Wang commented on FLINK-17313:


I think you may misunderstand my fix. Source and sink should be in two 
different validation logic. If the logical type defined in ddl can be consumed 
by physical type of table sink returned, wy we must match those two schema 
exactly, and the check logic is a relaxed check :  
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232.
 I don't know if I express clearly, let me know if u have more questions . 
[~dwysakowicz]

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089366#comment-17089366
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

That is incorrect. Old type system does not support {{varchar(10)/decimal(22, 
2)}}. It should've failed earlier.


> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089362#comment-17089362
 ] 

Terry Wang commented on FLINK-17313:


Hi [~dwysakowicz] The types that originate form the old type system of 
varcahr(10)/decimal(22, 2)/timestamp(3) is String/legacy(Decimal)/timestamp(3) 
and should be able to accept corresponding logical type in ddl, which is also 
the [PR|https://github.com/apache/flink/pull/11848] aims to solve.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089361#comment-17089361
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

The validation does not distinguish validation of source and sink. It validates 
that a logical type can be mapped to a physical type. The proposed fix 
(https://github.com/apache/flink/pull/11848) breaks that. It treats a logical 
as physical type for sinks. 

The purpose of this class is to check that a schema from DDL (the logical 
schema) matches the physical schema of the source(the purpose of that schema is 
primarily to provide bridging classes). Those schemas must match exactly. That 
said I don't think this is a valid solution.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089357#comment-17089357
 ] 

Terry Wang commented on FLINK-17313:


[~lzljs3620320] There isn't much need to introduce a new interface in TableSink 
to solve this ticket. Just as [~wenlong.lwl] said, it's a validation bug 
causing connectors using old TableSink can not work normmaly.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089350#comment-17089350
 ] 

Wenlong Lyu commented on FLINK-17313:
-

hi, all, the ticket is trying to fix the bug of the validation of 
PhysicalDataType and LogicalDataType of TableSink only, I think it is much more 
clear and clean, worth to consider to fix. Currently the validation on sink 
reuses the validation on source while they should be different actually.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089338#comment-17089338
 ] 

Jingsong Lee commented on FLINK-17313:
--

A work-around way is here: https://issues.apache.org/jira/browse/FLINK-15469

If this is not so hacky, maybe we can consider it.

CC: [~docete]

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089331#comment-17089331
 ] 

Dawid Wysakowicz commented on FLINK-17313:
--

Hi [~Terry1897] First of all I do agree type system rework is not finished yet 
and has more rough edges than we would like. This is one of such cases.

I agree with [~jark] here. UpsertStreamTableSink does not support the new type 
system. Therefore it can support only the types that originate from the old 
type system. Therefore e.g. {{VARCHAR(10)}} or {{DECIMAL}} with precision are 
not supported in this case. As Jark has already said it should be fixed with 
FLIP-95.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089326#comment-17089326
 ] 

Terry Wang commented on FLINK-17313:


[~jark]  I agree with you that new sink interface of FLIP-95 can solve problem, 
but there still a lot of connector that use the old interface. It's harmless to 
support such compatibility and useful for users who can not migrate their 
connector in time, right?

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089300#comment-17089300
 ] 

Jark Wu commented on FLINK-17313:
-

I think the root cause is that you are using the legacy type information which 
can't connect to planner smoothly. 
Because of the complexity, we don't plan to support new type system for 
UpsertStreamTableSink. But will fully support new system for FLIP-95 new sink 
interface. I think your problem will be solved once FLIP-95 is finished. 


> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089258#comment-17089258
 ] 

Terry Wang commented on FLINK-17313:


I open a 
[https://github.com/apache/flink/pull/11848|https://github.com/apache/flink/pull/11848]
 to help understanding and solve this validation exception.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089251#comment-17089251
 ] 

Terry Wang commented on FLINK-17313:


cc [~ykt836][~jark][~dwysakowicz]Please have a look on this issue.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>