dongjoon-hyun commented on a change in pull request #31319:
URL: https://github.com/apache/spark/pull/31319#discussion_r563986618
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -88,7 +88,7 @@ class ParquetReadSupport(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
val parquetFileSchema = context.getFileSchema
val parquetClippedSchema =
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
- catalystRequestedSchema, caseSensitive)
+ catalystRequestedSchema, caseSensitive, enableVectorizedReader)
Review comment:
This is used to throw exceptions instead of returning incorrect result,
@cloud-fan .
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -88,7 +88,7 @@ class ParquetReadSupport(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
val parquetFileSchema = context.getFileSchema
val parquetClippedSchema =
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
- catalystRequestedSchema, caseSensitive)
+ catalystRequestedSchema, caseSensitive, enableVectorizedReader)
Review comment:
We already throw `SchemaColumnConvertNotSupportedException` as a runtime
exception. This is the same approach.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -167,20 +168,33 @@ object ParquetReadSupport {
}
private def clipParquetType(
- parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type
= {
+ parquetType: Type,
+ catalystType: DataType,
+ caseSensitive: Boolean,
+ enableVectorizedReader: Boolean): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
- clipParquetListType(parquetType.asGroupType(), t.elementType,
caseSensitive)
+ clipParquetListType(
+ parquetType.asGroupType(), t.elementType, caseSensitive,
enableVectorizedReader)
case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
- clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType,
caseSensitive)
+ clipParquetMapType(
+ parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive,
enableVectorizedReader)
case t: StructType =>
- clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
+ clipParquetGroup(parquetType.asGroupType(), t, caseSensitive,
enableVectorizedReader)
+
+ case t: DecimalType if enableVectorizedReader =>
+ val p = parquetType.asPrimitiveType().getDecimalMetadata()
+ if (t.precision == p.getPrecision && t.scale == p.getScale) {
+ parquetType
+ } else {
+ throw new UnsupportedOperationException("Schema evolution not
supported.")
Review comment:
If we add `complex` type columns like nested structure, it will succeed
to read because vectorized reader is only enabled for atomic columns.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -167,20 +168,33 @@ object ParquetReadSupport {
}
private def clipParquetType(
- parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type
= {
+ parquetType: Type,
+ catalystType: DataType,
+ caseSensitive: Boolean,
+ enableVectorizedReader: Boolean): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
- clipParquetListType(parquetType.asGroupType(), t.elementType,
caseSensitive)
+ clipParquetListType(
+ parquetType.asGroupType(), t.elementType, caseSensitive,
enableVectorizedReader)
case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
- clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType,
caseSensitive)
+ clipParquetMapType(
+ parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive,
enableVectorizedReader)
case t: StructType =>
- clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
+ clipParquetGroup(parquetType.asGroupType(), t, caseSensitive,
enableVectorizedReader)
+
+ case t: DecimalType if enableVectorizedReader =>
+ val p = parquetType.asPrimitiveType().getDecimalMetadata()
+ if (t.precision == p.getPrecision && t.scale == p.getScale) {
+ parquetType
+ } else {
+ throw new UnsupportedOperationException("Schema evolution not
supported.")
Review comment:
Although this follows the previous exception style, we may add some
detailed reason like `Parquet vectorized reader doesn't support schema
evolution on decimal types. You may try to use
`${PARQUET_VECTORIZED_READER_ENABLED.key}=false`. Does it sound more clear?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -167,20 +168,33 @@ object ParquetReadSupport {
}
private def clipParquetType(
- parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type
= {
+ parquetType: Type,
+ catalystType: DataType,
+ caseSensitive: Boolean,
+ enableVectorizedReader: Boolean): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
- clipParquetListType(parquetType.asGroupType(), t.elementType,
caseSensitive)
+ clipParquetListType(
+ parquetType.asGroupType(), t.elementType, caseSensitive,
enableVectorizedReader)
case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
- clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType,
caseSensitive)
+ clipParquetMapType(
+ parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive,
enableVectorizedReader)
case t: StructType =>
- clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
+ clipParquetGroup(parquetType.asGroupType(), t, caseSensitive,
enableVectorizedReader)
+
+ case t: DecimalType if enableVectorizedReader =>
+ val p = parquetType.asPrimitiveType().getDecimalMetadata()
+ if (t.precision == p.getPrecision && t.scale == p.getScale) {
+ parquetType
+ } else {
+ throw new UnsupportedOperationException("Schema evolution not
supported.")
Review comment:
Although this follows the previous exception style, we may add some
detailed reason like `Parquet vectorized reader doesn't support schema
evolution on decimal types. You may try to use
${PARQUET_VECTORIZED_READER_ENABLED.key}=false`. Does it sound more clear?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]