sunchao commented on a change in pull request #33888:
URL: https://github.com/apache/spark/pull/33888#discussion_r702020262



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -323,15 +344,27 @@ object ParquetReadSupport {
    * @return A list of clipped [[GroupType]] fields, which can be empty.
    */
   private def clipParquetGroupFields(
-      parquetRecord: GroupType, structType: StructType, caseSensitive: 
Boolean): Seq[Type] = {
+      parquetRecord: GroupType,
+      structType: StructType,
+      caseSensitive: Boolean,
+      accessByColumnOrdinal: Boolean): Seq[Type] = {
     val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat 
= false)
-    if (caseSensitive) {
+    if (accessByColumnOrdinal) {
+      structType.zipWithIndex.map { case (f, i) =>
+        if (i < parquetRecord.getFieldCount) {
+          clipParquetType(
+            parquetRecord.getType(i), f.dataType, caseSensitive, 
accessByColumnOrdinal)
+        } else {
+          toParquet.convertField(f)
+        }
+      }
+    } else if (caseSensitive) {
       val caseSensitiveParquetFieldMap =
         parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
       structType.map { f =>
         caseSensitiveParquetFieldMap
           .get(f.name)
-          .map(clipParquetType(_, f.dataType, caseSensitive))
+          .map(clipParquetType(_, f.dataType, caseSensitive, 
accessByColumnOrdinal))

Review comment:
       why do we need to pass down `accessByColumnOrdinal`? it is always false 
here right? also it seems we can't do this on nested columns?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -200,19 +200,28 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private[this] val fieldConverters: Array[Converter with 
HasParentContainerUpdater] = {
-    // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is 
false
-    // to prevent throwing IllegalArgumentException when searching catalyst 
type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    if (SQLConf.get.parquetAccessByIndex) {
+      // SPARK-36634: When access parquet file by the idx of columns, we can 
not ensure 2 types
+      // matched
+      parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {

Review comment:
       what if `len(parquetType.getFields != len(catalystType)`? I think it's 
not always guaranteed because of `ParquetReadSupport.intersectParquetGroups`? 
e.g., some fields in catalyst requested schema do not appear in Parquet file 
schema?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -139,7 +139,8 @@ private[parquet] class ParquetRowConverter(
     convertTz: Option[ZoneId],
     datetimeRebaseMode: LegacyBehaviorPolicy.Value,
     int96RebaseMode: LegacyBehaviorPolicy.Value,
-    updater: ParentContainerUpdater)
+    updater: ParentContainerUpdater,
+    parquetAccessByOrdinal: Boolean)

Review comment:
       nit: `parquetAccessByOrdinal` -> `accessByOrdinal`? parquet seems 
redundant in this context. Also please update the method comments with the new 
parameter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to