cloud-fan commented on a change in pull request #24296: [SPARK-27384][SQL] File 
source V2: Prune unnecessary partition columns
URL: https://github.com/apache/spark/pull/24296#discussion_r272484527
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##########
 @@ -16,15 +16,43 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitioningUtils}
+import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.types.StructType
 
-abstract class FileScanBuilder(schema: StructType)
-  extends ScanBuilder
-  with SupportsPushDownRequiredColumns {
-  protected var readSchema = schema
+abstract class FileScanBuilder(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
+  private val partitionSchema = fileIndex.partitionSchema
+  private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-    this.readSchema = requiredSchema
+    this.requiredSchema = requiredSchema
   }
+
+  protected def readDataSchema: StructType = {
+    val fields = dataSchema.fields.filter { field =>
+      val colName = PartitioningUtils.getColName(field, isCaseSensitive)
+      requiredNameSet.contains(colName) && !partitionNameSet.contains(colName)
+    }
+    StructType(fields)
+  }
+
+  protected def readPartitionSchema: StructType = {
+    val fields = partitionSchema.fields.filter { field =>
+      val colName = PartitioningUtils.getColName(field, isCaseSensitive)
+      requiredNameSet.contains(colName)
+    }
+    StructType(fields)
+  }
+
+  // Define as method instead of value, since `requiredSchema` is mutable.
 
 Review comment:
   but we should not create a set inside loop body. How about
   ```
   def createRequiredNameset ...
   ...
   val requiredNameSet = createRequiredNameset..
   val fields = partitionSchema.fields.filter { field => ... }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to