dongjoon-hyun commented on a change in pull request #34199:
URL: https://github.com/apache/spark/pull/34199#discussion_r740727131
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
##########
@@ -152,6 +152,7 @@ protected void initialize(String path, List<String>
columns) throws IOException
Configuration config = new Configuration();
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+ config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
Review comment:
Do we need to turn off `case-sensitivity` always?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
##########
@@ -152,6 +152,7 @@ protected void initialize(String path, List<String>
columns) throws IOException
Configuration config = new Configuration();
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+ config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
Review comment:
If this is recovered by somewhere else, could you add some comments
before this line, please?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
##########
@@ -152,6 +152,7 @@ protected void initialize(String path, List<String>
columns) throws IOException
Configuration config = new Configuration();
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+ config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
Review comment:
If this is recovered by somewhere else (like
`ParquetToSparkSchemaConverter`), could you add some comments before this line,
please?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -609,7 +610,13 @@ private[parquet] class ParquetRowConverter(
//
// If the element type does not match the Catalyst type and the
underlying repeated type
// does not belong to the legacy LIST type, then it is case 1;
otherwise, it is case 2.
- val guessedElementType = schemaConverter.convertField(repeatedType)
+ //
+ // Since `convertField` method requires a Parquet `ColumnIO` as input,
here we first create
+ // a dummy message type which wraps the given repeated type, and then
convert it to the
+ // `ColumnIO` using Parquet API.
+ val messageType =
Types.buildMessage().addField(repeatedType).named("foo")
Review comment:
Shall we define a string constant and use it?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -609,7 +610,13 @@ private[parquet] class ParquetRowConverter(
//
// If the element type does not match the Catalyst type and the
underlying repeated type
// does not belong to the legacy LIST type, then it is case 1;
otherwise, it is case 2.
- val guessedElementType = schemaConverter.convertField(repeatedType)
+ //
+ // Since `convertField` method requires a Parquet `ColumnIO` as input,
here we first create
+ // a dummy message type which wraps the given repeated type, and then
convert it to the
+ // `ColumnIO` using Parquet API.
+ val messageType =
Types.buildMessage().addField(repeatedType).named("foo")
+ val column = new ColumnIOFactory().getColumnIO(messageType)
+ val guessedElementType =
schemaConverter.convertField(column.getChild(0)).sparkType
Review comment:
If it does, shall we keep the previous code ` val guessedElementType =
schemaConverter.convertField(repeatedType)`?
> Yes, it will return the same type as before.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -609,7 +610,13 @@ private[parquet] class ParquetRowConverter(
//
// If the element type does not match the Catalyst type and the
underlying repeated type
// does not belong to the legacy LIST type, then it is case 1;
otherwise, it is case 2.
- val guessedElementType = schemaConverter.convertField(repeatedType)
+ //
+ // Since `convertField` method requires a Parquet `ColumnIO` as input,
here we first create
+ // a dummy message type which wraps the given repeated type, and then
convert it to the
+ // `ColumnIO` using Parquet API.
+ val messageType =
Types.buildMessage().addField(repeatedType).named("foo")
+ val column = new ColumnIOFactory().getColumnIO(messageType)
+ val guessedElementType =
schemaConverter.convertField(column.getChild(0)).sparkType
Review comment:
If it does, shall we keep the previous code ` val guessedElementType =
schemaConverter.convertField(repeatedType)`?
> Yes, it will return the same type as before.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -43,57 +41,128 @@ import org.apache.spark.sql.types._
* [[StringType]] fields.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be
assumed to be Spark SQL
* [[TimestampType]] fields.
+ * @param caseSensitive Whether use case sensitive analysis when comparing
Spark catalyst read
+ * schema with Parquet schema
*/
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean =
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
- assumeInt96IsTimestamp: Boolean =
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
+ assumeInt96IsTimestamp: Boolean =
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+ caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
- assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
+ assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+ caseSensitive = conf.caseSensitiveAnalysis)
def this(conf: Configuration) = this(
assumeBinaryIsString =
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
- assumeInt96IsTimestamp =
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
+ assumeInt96IsTimestamp =
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+ caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean)
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL
[[StructType]].
*/
- def convert(parquetSchema: MessageType): StructType =
convert(parquetSchema.asGroupType())
+ def convert(parquetSchema: MessageType): StructType = {
+ val column = new ColumnIOFactory().getColumnIO(parquetSchema)
+ val converted = convertInternal(column)
+ converted.sparkType.asInstanceOf[StructType]
+ }
- private def convert(parquetSchema: GroupType): StructType = {
- val fields = parquetSchema.getFields.asScala.map { field =>
- field.getRepetition match {
- case OPTIONAL =>
- StructField(field.getName, convertField(field), nullable = true)
+ /**
+ * Convert `parquetSchema` into a [[ParquetColumn]] which contains its
corresponding Spark
+ * SQL [[StructType]] along with other information such as the maximum
repetition and definition
+ * level of each node, column descriptor for the leave nodes, etc.
+ *
+ * If `sparkReadSchema` is not empty, when deriving Spark SQL type from a
Parquet field this will
+ * check if the same field also exists in the schema. If so, it will use the
Spark SQL type.
+ * This is necessary since conversion from Parquet to Spark could cause
precision loss. For
+ * instance, Spark read schema is smallint/tinyint but Parquet only support
int.
+ */
+ def convertParquetColumn(
+ parquetSchema: MessageType,
+ sparkReadSchema: Option[StructType] = None): ParquetColumn = {
+ val column = new ColumnIOFactory().getColumnIO(parquetSchema)
+ convertInternal(column, sparkReadSchema)
+ }
- case REQUIRED =>
- StructField(field.getName, convertField(field), nullable = false)
+ private def convertInternal(
+ groupColumn: GroupColumnIO,
+ sparkReadSchema: Option[StructType] = None): ParquetColumn = {
+ val converted = (0 until groupColumn.getChildrenCount).map { i =>
+ val field = groupColumn.getChild(i)
+ val fieldFromReadSchema = sparkReadSchema.flatMap { schema =>
+ schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive))
+ }
+ var fieldReadType = fieldFromReadSchema.map(_.dataType)
+
+ // If a field is repeated here then it is neither contained by a `LIST`
nor `MAP`
+ // annotated group (these should've been handled in
`convertGroupField`), e.g.:
+ //
+ // message schema {
+ // repeated int32 int_array;
+ // }
+ // or
+ // message schema {
+ // repeated group struct_array {
+ // optional int32 field;
+ // }
+ // }
+ //
+ // the corresponding Spark read type should be an array and we should
pass the element type
+ // to the group or primitive type conversion method.
+ if (field.getType.getRepetition == REPEATED) {
+ fieldReadType = fieldReadType.flatMap {
+ case at: ArrayType => Some(at.elementType)
+ case _ =>
+ throw
QueryCompilationErrors.illegalParquetTypeError(groupColumn.toString)
+ }
+ }
+
+ val convertedField = convertField(field, fieldReadType)
+ val fieldName =
fieldFromReadSchema.map(_.name).getOrElse(field.getType.getName)
+
+ field.getType.getRepetition match {
+ case OPTIONAL | REQUIRED =>
+ val nullable = field.getType.getRepetition == OPTIONAL
+ (StructField(fieldName, convertedField.sparkType, nullable =
nullable),
+ convertedField)
case REPEATED =>
// A repeated field that is neither contained by a `LIST`- or
`MAP`-annotated group nor
// annotated by `LIST` or `MAP` should be interpreted as a required
list of required
// elements where the element type is the type of the field.
- val arrayType = ArrayType(convertField(field), containsNull = false)
- StructField(field.getName, arrayType, nullable = false)
+ val arrayType = ArrayType(convertedField.sparkType, containsNull =
false)
+ (StructField(fieldName, arrayType, nullable = false),
+ ParquetColumn(arrayType, None, convertedField.repetitionLevel -
1,
+ convertedField.definitionLevel - 1, required = true,
convertedField.path,
+ Seq(convertedField.copy(required = true))))
}
}
- StructType(fields.toSeq)
+ ParquetColumn(StructType(converted.map(_._1)), groupColumn,
converted.map(_._2))
}
+ private def isSameFieldName(left: String, right: String, caseSensitive:
Boolean): Boolean =
+ if (!caseSensitive) left.equalsIgnoreCase(right)
+ else left == right
+
/**
* Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
Review comment:
This looks like a reverse direction. Shall we update this because new
`convertFiled` will convert `ColumIO` to `ParquetColumn`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -243,20 +338,30 @@ class ParquetToSparkSchemaConverter(
ParquetSchemaConverter.checkConversionRequirement(
field.getFieldCount == 1 && !field.getType(0).isPrimitive,
s"Invalid map type: $field")
+ ParquetSchemaConverter.checkConversionRequirement(
+ sparkReadType.forall(_.isInstanceOf[MapType]),
+ s"Invalid Spark read type: expected $field to be map type but found
$sparkReadType")
- val keyValueType = field.getType(0).asGroupType()
+ val keyValue = groupColumn.getChild(0).asInstanceOf[GroupColumnIO]
+ val keyValueType = keyValue.getType.asGroupType()
ParquetSchemaConverter.checkConversionRequirement(
keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount ==
2,
s"Invalid map type: $field")
- val keyType = keyValueType.getType(0)
- val valueType = keyValueType.getType(1)
+ val key = keyValue.getChild(0)
+ val value = keyValue.getChild(1)
+ val sparkReadKeyType =
sparkReadType.map(_.asInstanceOf[MapType].keyType)
+ val sparkReadValueType =
sparkReadType.map(_.asInstanceOf[MapType].valueType)
+ val valueType = value.getType
val valueOptional = valueType.isRepetition(OPTIONAL)
Review comment:
nit. Maybe.
```scala
- val valueType = value.getType
- val valueOptional = valueType.isRepetition(OPTIONAL)
+ val valueOptional = value.getType.isRepetition(OPTIONAL)
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -222,17 +297,37 @@ class ParquetToSparkSchemaConverter(
case _: ListLogicalTypeAnnotation =>
ParquetSchemaConverter.checkConversionRequirement(
field.getFieldCount == 1, s"Invalid list type $field")
+ ParquetSchemaConverter.checkConversionRequirement(
+ sparkReadType.forall(_.isInstanceOf[ArrayType]),
+ s"Invalid Spark read type: expected $field to be list type but found
$sparkReadType")
- val repeatedType = field.getType(0)
+ val repeated = groupColumn.getChild(0)
+ val repeatedType = repeated.getType
ParquetSchemaConverter.checkConversionRequirement(
repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+ val sparkReadElementType =
sparkReadType.map(_.asInstanceOf[ArrayType].elementType)
if (isElementType(repeatedType, field.getName)) {
- ArrayType(convertField(repeatedType), containsNull = false)
+ var converted = convertField(repeated, sparkReadElementType)
+ val convertedType =
sparkReadElementType.getOrElse(converted.sparkType)
+
+ // legacy format such as:
+ // optional group my_list (LIST) {
+ // repeated int32 element;
+ // }
+ // we should mark the primitive field as required
+ if (repeatedType.isPrimitive) converted = converted.copy(required =
true)
+
+ ParquetColumn(ArrayType(convertedType, containsNull = false),
+ groupColumn, Seq(converted))
} else {
- val elementType = repeatedType.asGroupType().getType(0)
+ val element = repeated.asInstanceOf[GroupColumnIO].getChild(0)
+ val elementType = element.getType
val optional = elementType.isRepetition(OPTIONAL)
Review comment:
nit.
```scala
- val elementType = element.getType
- val optional = elementType.isRepetition(OPTIONAL)
+ val optional = element.getType.isRepetition(OPTIONAL)
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
Review comment:
Shall we codify `equals` and `path`? And, maybe `compares` instead of
`compare`?
```
- Parquet ColumnDescriptor equals only compare path so we'll need ...
- Since Parquet ColumnDescriptor `equals` only compares `path`, we need ...
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
Review comment:
Shall we codify `equals` and `path`? And, maybe `compares` instead of
`compare`?
```scala
- Parquet ColumnDescriptor equals only compare path so we'll need ...
- Since Parquet ColumnDescriptor `equals` only compares `path`, we need ...
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
+ // explicitly here
+ if (actual.descriptor.isDefined && expected.descriptor.isDefined) {
+ val actualDesc = actual.descriptor.get
+ val expectedDesc = expected.descriptor.get
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
Review comment:
BTW, just a question. Is this a bug of Apache Parquet? I'm curious why
Apache Parquet only compares `path`.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
+ // explicitly here
+ if (actual.descriptor.isDefined && expected.descriptor.isDefined) {
+ val actualDesc = actual.descriptor.get
+ val expectedDesc = expected.descriptor.get
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+ }
+
+ assert(actual.repetitionLevel == expected.repetitionLevel, "repetition
level mismatch: " +
+ s"actual = ${actual.repetitionLevel}, expected =
${expected.repetitionLevel}")
+ assert(actual.definitionLevel == expected.definitionLevel, "definition
level mismatch: " +
+ s"actual = ${actual.definitionLevel}, expected =
${expected.definitionLevel}")
+ assert(actual.required == expected.required, "required mismatch: " +
+ s"actual = ${actual.required}, expected = ${expected.required}")
+ assert(actual.path == expected.path, "path mismatch: " +
+ s"actual = $actual.path, expected = ${expected.path}")
Review comment:
We need `{}`.
```scala
- s"actual = $actual.path, expected = ${expected.path}")
+ s"actual = ${actual.path}, expected = ${expected.path}")
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
+ // explicitly here
+ if (actual.descriptor.isDefined && expected.descriptor.isDefined) {
+ val actualDesc = actual.descriptor.get
+ val expectedDesc = expected.descriptor.get
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+ }
+
+ assert(actual.repetitionLevel == expected.repetitionLevel, "repetition
level mismatch: " +
+ s"actual = ${actual.repetitionLevel}, expected =
${expected.repetitionLevel}")
+ assert(actual.definitionLevel == expected.definitionLevel, "definition
level mismatch: " +
+ s"actual = ${actual.definitionLevel}, expected =
${expected.definitionLevel}")
+ assert(actual.required == expected.required, "required mismatch: " +
+ s"actual = ${actual.required}, expected = ${expected.required}")
+ assert(actual.path == expected.path, "path mismatch: " +
+ s"actual = $actual.path, expected = ${expected.path}")
+
+ assert(actual.children.size == expected.children.size, "number of children
mismatch: " +
Review comment:
`number of children` -> `size of children`
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
+ // explicitly here
+ if (actual.descriptor.isDefined && expected.descriptor.isDefined) {
+ val actualDesc = actual.descriptor.get
+ val expectedDesc = expected.descriptor.get
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+ }
+
+ assert(actual.repetitionLevel == expected.repetitionLevel, "repetition
level mismatch: " +
+ s"actual = ${actual.repetitionLevel}, expected =
${expected.repetitionLevel}")
+ assert(actual.definitionLevel == expected.definitionLevel, "definition
level mismatch: " +
+ s"actual = ${actual.definitionLevel}, expected =
${expected.definitionLevel}")
+ assert(actual.required == expected.required, "required mismatch: " +
+ s"actual = ${actual.required}, expected = ${expected.required}")
+ assert(actual.path == expected.path, "path mismatch: " +
+ s"actual = $actual.path, expected = ${expected.path}")
+
+ assert(actual.children.size == expected.children.size, "number of children
mismatch: " +
+ s"actual = ${actual.children.size}, expected =
${expected.children.size}")
+ actual.children.zip(expected.children).foreach { case (actualChild,
expectedChild) =>
+ compareParquetColumn(actualChild, expectedChild)
+ }
+ }
+
+ protected def primitiveParquetColumn(
+ sparkType: DataType,
+ parquetTypeName: PrimitiveTypeName,
+ repetition: Repetition,
+ repLevel: Int,
Review comment:
If you don't mind, let's use `repetitionLevel` consistently.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -114,7 +130,66 @@ abstract class ParquetSchemaTest extends ParquetTest with
SharedSparkSession {
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp)
+ int96AsTimestamp,
+ expectedParquetColumn = expectedParquetColumn)
+ }
+
+ protected def compareParquetColumn(actual: ParquetColumn, expected:
ParquetColumn): Unit = {
+ assert(actual.sparkType == expected.sparkType, "sparkType mismatch: " +
+ s"actual = ${actual.sparkType}, expected = ${expected.sparkType}")
+ assert(actual.descriptor === expected.descriptor, "column descriptor
mismatch: " +
+ s"actual = ${actual.descriptor}, expected = ${expected.descriptor})")
+ // Parquet ColumnDescriptor equals only compare path so we'll need to
compare other fields
+ // explicitly here
+ if (actual.descriptor.isDefined && expected.descriptor.isDefined) {
+ val actualDesc = actual.descriptor.get
+ val expectedDesc = expected.descriptor.get
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getMaxRepetitionLevel ==
expectedDesc.getMaxRepetitionLevel)
+ assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
+ }
+
+ assert(actual.repetitionLevel == expected.repetitionLevel, "repetition
level mismatch: " +
+ s"actual = ${actual.repetitionLevel}, expected =
${expected.repetitionLevel}")
+ assert(actual.definitionLevel == expected.definitionLevel, "definition
level mismatch: " +
+ s"actual = ${actual.definitionLevel}, expected =
${expected.definitionLevel}")
+ assert(actual.required == expected.required, "required mismatch: " +
+ s"actual = ${actual.required}, expected = ${expected.required}")
+ assert(actual.path == expected.path, "path mismatch: " +
+ s"actual = $actual.path, expected = ${expected.path}")
+
+ assert(actual.children.size == expected.children.size, "number of children
mismatch: " +
+ s"actual = ${actual.children.size}, expected =
${expected.children.size}")
+ actual.children.zip(expected.children).foreach { case (actualChild,
expectedChild) =>
+ compareParquetColumn(actualChild, expectedChild)
+ }
+ }
+
+ protected def primitiveParquetColumn(
+ sparkType: DataType,
+ parquetTypeName: PrimitiveTypeName,
+ repetition: Repetition,
+ repLevel: Int,
+ defLevel: Int,
Review comment:
`defLevel` -> `definitionLevel`?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -902,6 +1890,181 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
writeLegacyParquetFormat = true)
+ testParquetToCatalyst(
+ "SPARK-36935: test case insensitive when converting Parquet schema",
Review comment:
Thank you!
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
##########
@@ -902,6 +1890,181 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
writeLegacyParquetFormat = true)
+ testParquetToCatalyst(
+ "SPARK-36935: test case insensitive when converting Parquet schema",
+ StructType(Seq(StructField("F1", ShortType))),
+ """message root {
+ | optional int32 f1;
+ |}
+ |""".stripMargin,
Review comment:
indentation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]