hehuiyuan created FLINK-20301:
---------------------------------

             Summary: Flink sql 1.10 : Legacy Decimal and decimal  for Array  
that is not Compatible
                 Key: FLINK-20301
                 URL: https://issues.apache.org/jira/browse/FLINK-20301
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: hehuiyuan
         Attachments: image-2020-11-23-23-48-02-102.png

The error log:

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the 
physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the 
TableSource return type.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Type ARRAY<DECIMAL(38, 18)> of 
table field 'numbers' does not match with the physical type 
ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the TableSource 
return type. at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:160)
 at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:185)
 at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:246)
 at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at 
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) 
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:228)
 at 
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:206)
 at 
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:110)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
{code}
 

 

 

Background :

Flink SQL  --- blink ---1.10

The shema for TableSource is JSON:

 
{code:java}
 "{type:'object',properties:{age:{type:'number'},numbers: { type: 'array', 
items: { type: 'number' } },name:{type:'string'},dt:{type: 'string', format: 
'date-time'},timehour:{type: 'string', format: 'time'} }}"
{code}
 

The validate throw exception :

Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the 
physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the 
TableSource return type.

The type `Array[Decimal]`  should be considered.

!image-2020-11-23-23-48-02-102.png|width=594,height=364!

 

I think the `visit ( ArrayType source)` should be considered.
{code:java}
 @Override
   public Boolean visit(ArrayType sourceType1){
    if (sourceType1 instanceof ArrayType
     && sourceType1.getElementType().getTypeRoot() == LogicalTypeRoot.DECIMAL) {
     DecimalType logicalDecimalType = (DecimalType)sourceType1.getElementType();
     if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
      logicalDecimalType.getScale() != 18) {
      throw new ValidationException(
       "Legacy decimal type can only be mapped to DECIMAL(38, 18).");
     }
     return true;
    }
    return false;
   }
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to