sunchao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802937583



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +358,7 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+

Review comment:
       nit: unrelated change 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -242,29 +297,43 @@ object ParquetReadSupport {
       // "_tuple" appended then the repeated type is the element type and 
elements are required.
       // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
       // only field.
-      if (
+      val newParquetList = if (
         repeatedGroup.getFieldCount > 1 ||
         repeatedGroup.getName == "array" ||
         repeatedGroup.getName == parquetList.getName + "_tuple"
       ) {
         Types
           .buildGroup(parquetList.getRepetition)
           .as(LogicalTypeAnnotation.listType())
-          .addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
+          .addField(clipParquetType(repeatedGroup, elementType, caseSensitive, 
useFieldId))
           .named(parquetList.getName)
       } else {
+        val newRepeatedGroup = Types
+          .repeatedGroup()
+          .addField(
+            clipParquetType(
+              repeatedGroup.getType(0), elementType, caseSensitive, 
useFieldId))
+          .named(repeatedGroup.getName)
+
+        val newElementType = if (useFieldId && repeatedGroup.getId() != null) {

Review comment:
       nit: I think we can use `getId`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +446,49 @@ object ParquetReadSupport {
               throw 
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, 
useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is 
matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", 
", ", "]")
+            throw 
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, 
useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name 
match by accident
+          // We need this name to be unique as well, otherwise there will be 
type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {

Review comment:
       I think this can be simplified:
   ```scala
       val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
       structType.map { f =>
         if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
           matchIdField(f)
         } else if (caseSensitive) {
           matchCaseSensitiveField(f)
         } else {
           matchCaseInsensitiveField(f)
         }
       }
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the 
Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has 
the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || 
recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || 
recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the 
recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {

Review comment:
       maybe we can consider to combine `getFieldId` and `hasFieldId` into a 
single method:
   ```
   def getFieldId(field: StructField): Option[Int]
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,39 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with 
HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is 
false
     // to prevent throwing IllegalArgumentException when searching catalyst 
type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex: Map[String, Int] = 
catalystType.fieldNames.zipWithIndex.toMap
+
+    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+      nameToIndex
     } else {
-      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+      CaseInsensitiveMap(nameToIndex)
     }
+
+    // (SPARK-38094) parquet field ids, if exist, should be prioritized for 
matching
+    val catalystFieldIdxByFieldId =
+      if (SQLConf.get.parquetFieldIdReadEnabled && 
ParquetUtils.hasFieldIds(catalystType)) {
+        catalystType.fields
+          .zipWithIndex
+          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }
+          .map { case (f, idx) => (ParquetUtils.getFieldId(f), idx) }
+          .toMap
+      } else {
+        Map.empty[Int, Int]
+      }
+
     parquetType.getFields.asScala.map { parquetField =>
-      val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
-      val catalystField = catalystType(fieldIndex)
+      val catalystFieldIndex = Option(parquetField.getId).map { fieldId =>

Review comment:
       nit: I think we can use `flatMap` here:
   ```scala
   Option(parquetField.getId).flatMap { fieldId =>
           // field has id, try to match by id first before falling back to 
match by name
           catalystFieldIdxByFieldId.get(fieldId.intValue())
         }.getOrElse {
           // field doesn't have id, just match by name
           catalystFieldIdxByName(parquetField.getName)
         }
   ```
   

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = 
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = 
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {

Review comment:
       can we add some doc above describing how this parameter is used?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When 
enabled, " +
+        "Parquet writers will populate the field Id " +
+        "metadata (if present) in the Spark schema to the Parquet schema.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When 
enabled, Parquet readers " +
+        "will use field IDs (if present) in the requested Spark schema to look 
up Parquet " +
+        "fields instead of using column names")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")

Review comment:
       should this be `spark.sql.parquet.fieldId.read.ignoreMissing`? wonder if 
it will be used on the write path.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
       parquetMap: GroupType,
       keyType: DataType,
       valueType: DataType,
-      caseSensitive: Boolean): GroupType = {
+      caseSensitive: Boolean,
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or 
value types.
     assert(!isPrimitiveCatalystType(keyType) || 
!isPrimitiveCatalystType(valueType))
 
     val repeatedGroup = parquetMap.getType(0).asGroupType()
     val parquetKeyType = repeatedGroup.getType(0)
     val parquetValueType = repeatedGroup.getType(1)
 
-    val clippedRepeatedGroup =
-      Types
+    val clippedRepeatedGroup = {
+      val newRepeatedGroup = Types
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
-        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
-        .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive, 
useFieldId))
+        .addField(clipParquetType(parquetValueType, valueType, caseSensitive, 
useFieldId))
         .named(repeatedGroup.getName)
+      if (useFieldId && repeatedGroup.getId != null) {
+        newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+      } else {
+        newRepeatedGroup
+      }
+    }
 
-    Types
+    val newMap = Types
       .buildGroup(parquetMap.getRepetition)
       .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
+
+    if (useFieldId && parquetMap.getId() != null) {

Review comment:
       can we move this into `clipParquetType`? 
   ```scala
     private def clipParquetType(
         parquetType: Type,
         catalystType: DataType,
         caseSensitive: Boolean,
         useFieldId: Boolean): Type = {
       val newParquetType = catalystType match {
         ...
       }
       if (useFieldId && newParquetType.getId != null) {
         newParquetType.withId(parquetType.getId.intValue())
       } else {
         newParquetType
       }
     }
   ```
   

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -109,6 +169,7 @@ class ParquetReadSupport(
       // in parquetRequestedSchema which are not present in the file.
       parquetClippedSchema
     }
+

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to