sunchao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802937583
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +358,7 @@ class ParquetFileFormat
}
} else {
logDebug(s"Falling back to parquet-mr")
+
Review comment:
nit: unrelated change
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -242,29 +297,43 @@ object ParquetReadSupport {
// "_tuple" appended then the repeated type is the element type and
elements are required.
// Build a new LIST-annotated group with clipped `repeatedGroup` as
element type and the
// only field.
- if (
+ val newParquetList = if (
repeatedGroup.getFieldCount > 1 ||
repeatedGroup.getName == "array" ||
repeatedGroup.getName == parquetList.getName + "_tuple"
) {
Types
.buildGroup(parquetList.getRepetition)
.as(LogicalTypeAnnotation.listType())
- .addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
+ .addField(clipParquetType(repeatedGroup, elementType, caseSensitive,
useFieldId))
.named(parquetList.getName)
} else {
+ val newRepeatedGroup = Types
+ .repeatedGroup()
+ .addField(
+ clipParquetType(
+ repeatedGroup.getType(0), elementType, caseSensitive,
useFieldId))
+ .named(repeatedGroup.getName)
+
+ val newElementType = if (useFieldId && repeatedGroup.getId() != null) {
Review comment:
nit: I think we can use `getId`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +446,49 @@ object ParquetReadSupport {
throw
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
f.name, parquetTypesString)
} else {
- clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+ clipParquetType(parquetTypes.head, f.dataType, caseSensitive,
useFieldId)
}
}.getOrElse(toParquet.convertField(f))
+ }
+
+ def matchIdField(f: StructField): Type = {
+ val fieldId = ParquetUtils.getFieldId(f)
+ idToParquetFieldMap
+ .get(fieldId)
+ .map { parquetTypes =>
+ if (parquetTypes.size > 1) {
+ // Need to fail if there is ambiguity, i.e. more than one field is
matched
+ val parquetTypesString = parquetTypes.map(_.getName).mkString("[",
", ", "]")
+ throw
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+ fieldId, parquetTypesString)
+ } else {
+ clipParquetType(parquetTypes.head, f.dataType, caseSensitive,
useFieldId)
+ }
+ }.getOrElse {
+ // When there is no ID match, we use a fake name to avoid a name
match by accident
+ // We need this name to be unique as well, otherwise there will be
type conflicts
+ toParquet.convertField(f.copy(name = generateFakeColumnName))
+ }
+ }
+
+ if (useFieldId && ParquetUtils.hasFieldIds(structType)) {
Review comment:
I think this can be simplified:
```scala
val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
structType.map { f =>
if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
matchIdField(f)
} else if (caseSensitive) {
matchCaseSensitiveField(f)
} else {
matchCaseInsensitiveField(f)
}
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+ /**
+ * A StructField metadata key used to set the field id of a column in the
Parquet schema.
+ */
+ val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+ /**
+ * Whether there exists a field in the schema, whether inner or leaf, has
the parquet field
+ * ID metadata.
+ */
+ def hasFieldIds(schema: StructType): Boolean = {
+ def recursiveCheck(schema: DataType): Boolean = {
+ schema match {
+ case st: StructType =>
+ st.exists(field => hasFieldId(field) ||
recursiveCheck(field.dataType))
+
+ case at: ArrayType => recursiveCheck(at.elementType)
+
+ case mt: MapType => recursiveCheck(mt.keyType) ||
recursiveCheck(mt.valueType)
+
+ case _ =>
+ // No need to really check primitive types, just to terminate the
recursion
+ false
+ }
+ }
+ if (schema.isEmpty) false else recursiveCheck(schema)
+ }
+
+ def hasFieldId(field: StructField): Boolean =
+ field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+ def getFieldId(field: StructField): Int = {
Review comment:
maybe we can consider to combine `getFieldId` and `hasFieldId` into a
single method:
```
def getFieldId(field: StructField): Option[Int]
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,39 @@ private[parquet] class ParquetRowConverter(
private[this] val fieldConverters: Array[Converter with
HasParentContainerUpdater] = {
// (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is
false
// to prevent throwing IllegalArgumentException when searching catalyst
type's field index
- val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
- catalystType.fieldNames.zipWithIndex.toMap
+ def nameToIndex: Map[String, Int] =
catalystType.fieldNames.zipWithIndex.toMap
+
+ val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+ nameToIndex
} else {
- CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+ CaseInsensitiveMap(nameToIndex)
}
+
+ // (SPARK-38094) parquet field ids, if exist, should be prioritized for
matching
+ val catalystFieldIdxByFieldId =
+ if (SQLConf.get.parquetFieldIdReadEnabled &&
ParquetUtils.hasFieldIds(catalystType)) {
+ catalystType.fields
+ .zipWithIndex
+ .filter { case (f, _) => ParquetUtils.hasFieldId(f) }
+ .map { case (f, idx) => (ParquetUtils.getFieldId(f), idx) }
+ .toMap
+ } else {
+ Map.empty[Int, Int]
+ }
+
parquetType.getFields.asScala.map { parquetField =>
- val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
- val catalystField = catalystType(fieldIndex)
+ val catalystFieldIndex = Option(parquetField.getId).map { fieldId =>
Review comment:
nit: I think we can use `flatMap` here:
```scala
Option(parquetField.getId).flatMap { fieldId =>
// field has id, try to match by id first before falling back to
match by name
catalystFieldIdxByFieldId.get(fieldId.intValue())
}.getOrElse {
// field doesn't have id, just match by name
catalystFieldIdxByName(parquetField.getName)
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
class SparkToParquetSchemaConverter(
writeLegacyParquetFormat: Boolean =
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
- SQLConf.ParquetOutputTimestampType.INT96) {
+ SQLConf.ParquetOutputTimestampType.INT96,
+ useFieldId: Boolean =
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {
Review comment:
can we add some doc above describing how this parameter is used?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
.intConf
.createWithDefault(4096)
+ val PARQUET_FIELD_ID_WRITE_ENABLED =
+ buildConf("spark.sql.parquet.fieldId.write.enabled")
+ .doc("Field ID is a native field of the Parquet schema spec. When
enabled, " +
+ "Parquet writers will populate the field Id " +
+ "metadata (if present) in the Spark schema to the Parquet schema.")
+ .version("3.3.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_FIELD_ID_READ_ENABLED =
+ buildConf("spark.sql.parquet.fieldId.read.enabled")
+ .doc("Field ID is a native field of the Parquet schema spec. When
enabled, Parquet readers " +
+ "will use field IDs (if present) in the requested Spark schema to look
up Parquet " +
+ "fields instead of using column names")
+ .version("3.3.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val IGNORE_MISSING_PARQUET_FIELD_ID =
+ buildConf("spark.sql.parquet.fieldId.ignoreMissing")
Review comment:
should this be `spark.sql.parquet.fieldId.read.ignoreMissing`? wonder if
it will be used on the write path.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
parquetMap: GroupType,
keyType: DataType,
valueType: DataType,
- caseSensitive: Boolean): GroupType = {
+ caseSensitive: Boolean,
+ useFieldId: Boolean): GroupType = {
// Precondition of this method, only handles maps with nested key types or
value types.
assert(!isPrimitiveCatalystType(keyType) ||
!isPrimitiveCatalystType(valueType))
val repeatedGroup = parquetMap.getType(0).asGroupType()
val parquetKeyType = repeatedGroup.getType(0)
val parquetValueType = repeatedGroup.getType(1)
- val clippedRepeatedGroup =
- Types
+ val clippedRepeatedGroup = {
+ val newRepeatedGroup = Types
.repeatedGroup()
.as(repeatedGroup.getLogicalTypeAnnotation)
- .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
- .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+ .addField(clipParquetType(parquetKeyType, keyType, caseSensitive,
useFieldId))
+ .addField(clipParquetType(parquetValueType, valueType, caseSensitive,
useFieldId))
.named(repeatedGroup.getName)
+ if (useFieldId && repeatedGroup.getId != null) {
+ newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+ } else {
+ newRepeatedGroup
+ }
+ }
- Types
+ val newMap = Types
.buildGroup(parquetMap.getRepetition)
.as(parquetMap.getLogicalTypeAnnotation)
.addField(clippedRepeatedGroup)
.named(parquetMap.getName)
+
+ if (useFieldId && parquetMap.getId() != null) {
Review comment:
can we move this into `clipParquetType`?
```scala
private def clipParquetType(
parquetType: Type,
catalystType: DataType,
caseSensitive: Boolean,
useFieldId: Boolean): Type = {
val newParquetType = catalystType match {
...
}
if (useFieldId && newParquetType.getId != null) {
newParquetType.withId(parquetType.getId.intValue())
} else {
newParquetType
}
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -109,6 +169,7 @@ class ParquetReadSupport(
// in parquetRequestedSchema which are not present in the file.
parquetClippedSchema
}
+
Review comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]