sunchao commented on a change in pull request #31413:
URL: https://github.com/apache/spark/pull/31413#discussion_r568340201



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {

Review comment:
       nit: perhaps rename this to `shouldNotPrune` or `shouldProcess`? 
`canPrune` sounds like the path should be ignored.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       curious what's the previous behavior of this, or is this newly 
introduced? we may need to add the info to the PR description (user-facing 
change). 
   
   Also I'm not sure if this is the best choice: if a bucketed table is 
corrupted, should we read the corrupt file? it will likely lead to incorrect 
results. We can choose to ignore the file which seems to be more aligned with 
the name of the config.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       curious what's the previous behavior of this, or is this newly 
introduced? we may need to add the info to the PR description (user-facing 
change). 
   
   Also I'm not sure if this is the best choice: if a bucketed table is 
corrupted, should we read the corrupt file? it will likely lead to incorrect 
results. On the other hand we can choose to ignore the file which seems to be 
more aligned with the name of the config, although result could still be 
incorrect.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       Cool thanks for pointing to the discussion. I'm just not sure whether 
the corrupted file should be ignored or processed if the flag is turned on. 
`ignoreCorruptedFiles` seems to indicate that the problematic file should be 
ignored so it is a bit confusing that we still process it here. Also IMO 
ignoring it seems to be slightly safer (thinking someone dump garbage files 
into the bucketed partition dir)?
   
   cc @maropu @viirya 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
+    // Filter files with bucket pruning if possible
+    lazy val ignoreCorruptFiles = 
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+    val canPrune: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) =>
+        filePath => {
+          BucketingUtils.getBucketId(filePath.getName) match {
+            case Some(id) => bucketSet.get(id)
+            case None =>
+              if (ignoreCorruptFiles) {
+                // If ignoring corrupt file, do not prune when bucket file 
name is invalid

Review comment:
       > I feel either skipping or processing the file is no way perfect.
   
   Yes agreed. 
   
   > Given users explicitly disable bucketing here for reading the table, I 
would assume they want to read the table as a non-bucketed table, so they would 
like to read all of input files
   
   Good point. Although it seems a bit weird that someone would do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to