dongjoon-hyun commented on a change in pull request #31319:
URL: https://github.com/apache/spark/pull/31319#discussion_r563986618



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -88,7 +88,7 @@ class ParquetReadSupport(
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
     val parquetFileSchema = context.getFileSchema
     val parquetClippedSchema = 
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
-      catalystRequestedSchema, caseSensitive)
+      catalystRequestedSchema, caseSensitive, enableVectorizedReader)

Review comment:
       This is used to throw exceptions instead of returning incorrect result, 
@cloud-fan .

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -88,7 +88,7 @@ class ParquetReadSupport(
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
     val parquetFileSchema = context.getFileSchema
     val parquetClippedSchema = 
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
-      catalystRequestedSchema, caseSensitive)
+      catalystRequestedSchema, caseSensitive, enableVectorizedReader)

Review comment:
       We already throw `SchemaColumnConvertNotSupportedException` as a runtime 
exception. This is the same approach.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -167,20 +168,33 @@ object ParquetReadSupport {
   }
 
   private def clipParquetType(
-      parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type 
= {
+      parquetType: Type,
+      catalystType: DataType,
+      caseSensitive: Boolean,
+      enableVectorizedReader: Boolean): Type = {
     catalystType match {
       case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
         // Only clips array types with nested type as element type.
-        clipParquetListType(parquetType.asGroupType(), t.elementType, 
caseSensitive)
+        clipParquetListType(
+          parquetType.asGroupType(), t.elementType, caseSensitive, 
enableVectorizedReader)
 
       case t: MapType
         if !isPrimitiveCatalystType(t.keyType) ||
            !isPrimitiveCatalystType(t.valueType) =>
         // Only clips map types with nested key type or value type
-        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, 
caseSensitive)
+        clipParquetMapType(
+          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, 
enableVectorizedReader)
 
       case t: StructType =>
-        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
+        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, 
enableVectorizedReader)
+
+      case t: DecimalType if enableVectorizedReader =>
+        val p = parquetType.asPrimitiveType().getDecimalMetadata()
+        if (t.precision == p.getPrecision && t.scale == p.getScale) {
+          parquetType
+        } else {
+          throw new UnsupportedOperationException("Schema evolution not 
supported.")

Review comment:
       If we add `complex` type columns like nested structure, it will succeed 
to read because vectorized reader is only enabled for atomic columns.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -167,20 +168,33 @@ object ParquetReadSupport {
   }
 
   private def clipParquetType(
-      parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type 
= {
+      parquetType: Type,
+      catalystType: DataType,
+      caseSensitive: Boolean,
+      enableVectorizedReader: Boolean): Type = {
     catalystType match {
       case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
         // Only clips array types with nested type as element type.
-        clipParquetListType(parquetType.asGroupType(), t.elementType, 
caseSensitive)
+        clipParquetListType(
+          parquetType.asGroupType(), t.elementType, caseSensitive, 
enableVectorizedReader)
 
       case t: MapType
         if !isPrimitiveCatalystType(t.keyType) ||
            !isPrimitiveCatalystType(t.valueType) =>
         // Only clips map types with nested key type or value type
-        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, 
caseSensitive)
+        clipParquetMapType(
+          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, 
enableVectorizedReader)
 
       case t: StructType =>
-        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
+        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, 
enableVectorizedReader)
+
+      case t: DecimalType if enableVectorizedReader =>
+        val p = parquetType.asPrimitiveType().getDecimalMetadata()
+        if (t.precision == p.getPrecision && t.scale == p.getScale) {
+          parquetType
+        } else {
+          throw new UnsupportedOperationException("Schema evolution not 
supported.")

Review comment:
       Although this follows the previous exception style, we may add some 
detailed reason like `Parquet vectorized reader doesn't support schema 
evolution on decimal types. You may try to use 
`${PARQUET_VECTORIZED_READER_ENABLED.key}=false`. Does it sound more clear?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -167,20 +168,33 @@ object ParquetReadSupport {
   }
 
   private def clipParquetType(
-      parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type 
= {
+      parquetType: Type,
+      catalystType: DataType,
+      caseSensitive: Boolean,
+      enableVectorizedReader: Boolean): Type = {
     catalystType match {
       case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
         // Only clips array types with nested type as element type.
-        clipParquetListType(parquetType.asGroupType(), t.elementType, 
caseSensitive)
+        clipParquetListType(
+          parquetType.asGroupType(), t.elementType, caseSensitive, 
enableVectorizedReader)
 
       case t: MapType
         if !isPrimitiveCatalystType(t.keyType) ||
            !isPrimitiveCatalystType(t.valueType) =>
         // Only clips map types with nested key type or value type
-        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, 
caseSensitive)
+        clipParquetMapType(
+          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, 
enableVectorizedReader)
 
       case t: StructType =>
-        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
+        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, 
enableVectorizedReader)
+
+      case t: DecimalType if enableVectorizedReader =>
+        val p = parquetType.asPrimitiveType().getDecimalMetadata()
+        if (t.precision == p.getPrecision && t.scale == p.getScale) {
+          parquetType
+        } else {
+          throw new UnsupportedOperationException("Schema evolution not 
supported.")

Review comment:
       Although this follows the previous exception style, we may add some 
detailed reason like `Parquet vectorized reader doesn't support schema 
evolution on decimal types. You may try to use 
${PARQUET_VECTORIZED_READER_ENABLED.key}=false`. Does it sound more clear?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to