[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361246115
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 ##
 @@ -135,8 +138,8 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
 }
   case _ => // ignore
 }
-  //rename
-  case a: RexCall if a.getKind.equals(SqlKind.AS) &&
+  //rename or cast
+  case a: RexCall if (a.getKind.equals(SqlKind.AS) || 
isFidelityCast(a)) &&
 
 Review comment:
   Do we have to add this?
   There are some casts that break uniqueness. For example, if cast fails, it 
will return null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361121651
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
 ##
 @@ -115,9 +114,9 @@ class BatchExecSink[T](
   private def translateToTransformation(
   withChangeFlag: Boolean,
   planner: BatchPlanner): Transformation[T] = {
+
 
 Review comment:
   Remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361124987
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##
 @@ -18,67 +18,99 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types, 
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import 
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
 toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, 
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType, 
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType, 
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
 
 import scala.collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
-* Checks if the given [[CatalogSinkModifyOperation]]'s query can be 
written to
-* the given [[TableSink]]. It checks if the names & the field types match. 
If the table
-* sink is a [[PartitionableTableSink]], also check that the partitions are 
valid.
+* Checks if the given query can be written into the given sink. It checks 
the field types
+* should be compatible (types should equal including precisions). If types 
are not compatible,
+* but can be implicitly casted, a cast projection will be applied. 
Otherwise, an exception will
+* be thrown.
+*
+* @param query the query to be checked
+* @param sinkSchema the schema of sink to be checked
+* @param typeFactory type factory
+* @return the query RelNode which may be applied the implicitly cast 
projection.
+*/
+  def validateSchemaAndApplyImplicitCast(
+  query: RelNode,
+  sinkSchema: TableSchema,
+  typeFactory: FlinkTypeFactory,
+  sinkIdentifier: Option[String] = None): RelNode = {
+
+val queryLogicalType = DataTypeUtils
+  // convert type to nullable, because we ignore nullability when writing 
query into sink
+  
.transform(FlinkTypeFactory.toTableSchema(query.getRowType).toRowDataType, 
toNullable)
+  .getLogicalType
+  .asInstanceOf[RowType]
+val sinkLogicalType = DataTypeUtils
+  // convert type to nullable, because we ignore nullability when writing 
query into sink
+  .transform(sinkSchema.toRowDataType, legacyDecimalToDefaultDecimal, 
toNullable)
+  .getLogicalType
+  .asInstanceOf[RowType]
+if (LogicalTypeChecks.areTypesCompatible(queryLogicalType, 
sinkLogicalType)) {
+  // types are compatible, do nothing
+  query
+} else if (LogicalTypeCasts.supportsImplicitCast(queryLogicalType, 
sinkLogicalType)) {
+  // types can be implicit casted, add a cast project
+  val castedDataType = typeFactory.buildRelNodeRowType(
+sinkLogicalType.getFieldNames,
+sinkLogicalType.getFields.map(_.getType))
+  RelOptUtils.createCastRel(query, castedDataType)
+} else {
+  // format query and sink schema strings
+  val srcSchema = queryLogicalType.getFields
+.map(f => s"${f.getName}: ${f.getType}")
+.mkString("[", ", ", "]")
+  val sinkSchema = sinkLogicalType.getFields
+.map(f => s"${f.getName}: ${f.getType}")
+.mkString("[", ", ", "]")
+
+  val sinkDesc: String = sinkIdentifier.getOrElse("")
+
+  throw new ValidationException(
+s"Field types of query result and registered TableSink $sinkDesc do 
not match.\n" +
+  s"Query schema: $srcSchema\n" +
+  s"Sink schema: $sinkSchema")
+}
+  }
+
+  /**
+* It checks whether the [[TableSink]] is compatible to the INSERT INTO 
clause, e.g.
+* whether the sink is 

[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361119627
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##
 @@ -18,67 +18,99 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types, 
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import 
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
 toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, 
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType, 
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType, 
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
 
 import scala.collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
-* Checks if the given [[CatalogSinkModifyOperation]]'s query can be 
written to
-* the given [[TableSink]]. It checks if the names & the field types match. 
If the table
-* sink is a [[PartitionableTableSink]], also check that the partitions are 
valid.
+* Checks if the given query can be written into the given sink. It checks 
the field types
+* should be compatible (types should equal including precisions). If types 
are not compatible,
+* but can be implicitly casted, a cast projection will be applied. 
Otherwise, an exception will
+* be thrown.
+*
+* @param query the query to be checked
+* @param sinkSchema the schema of sink to be checked
+* @param typeFactory type factory
+* @return the query RelNode which may be applied the implicitly cast 
projection.
+*/
+  def validateSchemaAndApplyImplicitCast(
+  query: RelNode,
+  sinkSchema: TableSchema,
+  typeFactory: FlinkTypeFactory,
+  sinkIdentifier: Option[String] = None): RelNode = {
+
+val queryLogicalType = DataTypeUtils
+  // convert type to nullable, because we ignore nullability when writing 
query into sink
 
 Review comment:
   Another choice is check in `if 
(LogicalTypeCasts.supportsImplicitCast(queryLogicalType, sinkLogicalType))`?
   You can take a look to `supportsCasting`, there are some nullable related 
check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361121577
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 ##
 @@ -135,8 +135,8 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
 }
   case _ => // ignore
 }
-  //rename
-  case a: RexCall if a.getKind.equals(SqlKind.AS) &&
+  //rename or cast
+  case a: RexCall if (a.getKind.equals(SqlKind.AS) || 
a.getKind.equals(SqlKind.CAST)) &&
 
 Review comment:
   This is for Upsert sink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361123259
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BaseRowTypeInfo.java
 ##
 @@ -60,7 +60,8 @@ public BaseRowTypeInfo(LogicalType... logicalTypes) {
 
public BaseRowTypeInfo(LogicalType[] logicalTypes, String[] fieldNames) 
{
super(BaseRow.class, Arrays.stream(logicalTypes)
-   
.map(TypeInfoLogicalTypeConverter::fromLogicalTypeToTypeInfo)
+   .map(TypeConversions::fromLogicalToDataType)
 
 Review comment:
   Just test need?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-24 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361122091
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##
 @@ -98,5 +130,148 @@ object TableSinkUtils {
 }
   }
 }
+
+sink match {
+  case overwritableTableSink: OverwritableTableSink =>
+overwritableTableSink.setOverwrite(sinkOperation.isOverwrite)
+  case _ =>
+assert(!sinkOperation.isOverwrite, "INSERT OVERWRITE requires " +
+  s"${classOf[OverwritableTableSink].getSimpleName} but actually got " 
+
+  sink.getClass.getName)
+}
+  }
+
+  /**
+* Inferences the physical schema of [[TableSink]], the physical schema 
ignores change flag
+* field and normalizes physical types (can be generic type or POJO type) 
into [[TableSchema]].
+* @param queryLogicalType the logical type of query, will be used to 
full-fill sink physical
+* schema if the sink physical type is not 
specified.
+* @param sink the instance of [[TableSink]]
+*/
+  def inferSinkPhysicalSchema(
+  queryLogicalType: RowType,
+  sink: TableSink[_]): TableSchema = {
+val withChangeFlag = sink match {
+  case _: RetractStreamTableSink[_] | _: UpsertStreamTableSink[_] => true
+  case _: StreamTableSink[_] => false
+  case dsts: DataStreamTableSink[_] => dsts.withChangeFlag
+}
+inferSinkPhysicalSchema(sink.getConsumedDataType, queryLogicalType, 
withChangeFlag)
+  }
+
+  /**
+* Inferences the physical schema of [[TableSink]], the physical schema 
ignores change flag
+* field and normalizes physical types (can be generic type or POJO type) 
into [[TableSchema]].
+*
+* @param consumedDataType the consumed data type of sink
+* @param queryLogicalType the logical type of query, will be used to 
full-fill sink physical
+* schema if the sink physical type is not 
specified.
+* @param withChangeFlag true if the emitted records contains change flags.
+*/
+  def inferSinkPhysicalSchema(
+  consumedDataType: DataType,
+  queryLogicalType: RowType,
+  withChangeFlag: Boolean): TableSchema = {
+// the requested output physical type which ignores the flag field
+val requestedOutputType = inferSinkPhysicalDataType(
+  consumedDataType,
+  queryLogicalType,
+  withChangeFlag)
+if (LogicalTypeChecks.isCompositeType(requestedOutputType.getLogicalType)) 
{
+  DataTypeUtils.expandCompositeTypeToSchema(requestedOutputType)
+} else {
+  // atomic type
+  TableSchema.builder().field("f0", requestedOutputType).build()
+}
+  }
+
+  /**
+* Inferences the physical data type of [[TableSink]], the physical data 
type ignores
+* the change flag field.
+*
+* @param consumedDataType the consumed data type of sink
+* @param queryLogicalType the logical type of query, will be used to 
full-fill sink physical
+* schema if the sink physical type is not 
specified.
+* @param withChangeFlag true if the emitted records contains change flags.
+*/
+  def inferSinkPhysicalDataType(
 
 Review comment:
   Provide a method return `sinkPhysicalDataType` and `outputType`?
   I think it is not good to extract physicalDataType from output type, and 
construct output type from physicalDataType again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-23 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361082465
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##
 @@ -18,67 +18,99 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types, 
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import 
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
 toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, 
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType, 
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType, 
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
 
 import scala.collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
-* Checks if the given [[CatalogSinkModifyOperation]]'s query can be 
written to
-* the given [[TableSink]]. It checks if the names & the field types match. 
If the table
-* sink is a [[PartitionableTableSink]], also check that the partitions are 
valid.
+* Checks if the given query can be written into the given sink. It checks 
the field types
+* should be compatible (types should equal including precisions). If types 
are not compatible,
+* but can be implicitly casted, a cast projection will be applied. 
Otherwise, an exception will
+* be thrown.
+*
+* @param query the query to be checked
+* @param sinkSchema the schema of sink to be checked
+* @param typeFactory type factory
+* @return the query RelNode which may be applied the implicitly cast 
projection.
+*/
+  def validateSchemaAndApplyImplicitCast(
+  query: RelNode,
+  sinkSchema: TableSchema,
+  typeFactory: FlinkTypeFactory,
+  sinkIdentifier: Option[String] = None): RelNode = {
+
+val queryLogicalType = DataTypeUtils
+  // convert type to nullable, because we ignore nullability when writing 
query into sink
 
 Review comment:
   Why need to nullable?
   Without this not work?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] Fix can't insert decimal data into sink using TypeInformation

2019-12-23 Thread GitBox
JingsongLi commented on a change in pull request #10667: [FLINK-15313][table] 
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361055773
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.transforms;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeTransformation;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+/**
+ * This type transformation transforms the legacy decimal type (usually 
converted from
+ * {@link org.apache.flink.api.common.typeinfo.Types#BIG_DEC}) to DECIMAL(38, 
18).
+ */
+public class LegacyDecimalTypeTransformation implements TypeTransformation {
+
+   public static final TypeTransformation INSTANCE = new 
LegacyDecimalTypeTransformation();
+
+   @Override
+   public DataType transform(DataType typeToTransform) {
+   LogicalType logicalType = typeToTransform.getLogicalType();
+   if (logicalType instanceof LegacyTypeInformationType && 
logicalType.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
+   DataType decimalType = DataTypes
+   .DECIMAL(DecimalType.MAX_PRECISION, 18)
+   
.bridgedTo(typeToTransform.getConversionClass());
+   if (!logicalType.isNullable()) {
 
 Review comment:
   return logicalType.isNullable() ? decimalType : decimalType.notNull();


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services