maropu commented on a change in pull request #31413:
URL: https://github.com/apache/spark/pull/31413#discussion_r568252913



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,34 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val filePruning: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName)
+          .getOrElse(sys.error(s"Invalid bucket file $filePath")))

Review comment:
       Nice!

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles

Review comment:
       nit: `lazy val`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid
+                true
+              } else {
+                throw new IllegalStateException(
+                  s"Invalid bucket file $filePath when doing bucket pruning. " 
+
+                  s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable 
exception " +
+                    "and read the file.")

Review comment:
       nit: wrong indent?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid
+                true
+              } else {
+                throw new IllegalStateException(
+                  s"Invalid bucket file $filePath when doing bucket pruning. " 
+
+                  s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable 
exception " +

Review comment:
       nit: `disable` -> `ignore`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {

Review comment:
       yea, okay.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to