c21 commented on a change in pull request #31413:
URL: https://github.com/apache/spark/pull/31413#discussion_r568318526
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
Review comment:
@maropu - updated.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) =>
+ filePath => {
+ BucketingUtils.getBucketId(filePath.getName) match {
+ case Some(id) => bucketSet.get(id)
+ case None =>
+ if (ignoreCorruptFiles) {
+ // If ignoring corrupt file, do not prune when bucket file
name is invalid
+ true
+ } else {
+ throw new IllegalStateException(
+ s"Invalid bucket file $filePath when doing bucket pruning. "
+
+ s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable
exception " +
+ "and read the file.")
Review comment:
@maropu - updated.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) =>
+ filePath => {
+ BucketingUtils.getBucketId(filePath.getName) match {
+ case Some(id) => bucketSet.get(id)
+ case None =>
+ if (ignoreCorruptFiles) {
+ // If ignoring corrupt file, do not prune when bucket file
name is invalid
+ true
+ } else {
+ throw new IllegalStateException(
+ s"Invalid bucket file $filePath when doing bucket pruning. "
+
+ s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable
exception " +
Review comment:
@maropu - updated.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ lazy val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
Review comment:
I am very bad at naming :) This is suggested from
https://github.com/apache/spark/pull/31413#discussion_r567614728. Shall I
change again? cc @maropu .
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ lazy val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) =>
+ filePath => {
+ BucketingUtils.getBucketId(filePath.getName) match {
+ case Some(id) => bucketSet.get(id)
+ case None =>
+ if (ignoreCorruptFiles) {
+ // If ignoring corrupt file, do not prune when bucket file
name is invalid
Review comment:
@sunchao - this is newly introduced. Updated PR description.
> Also I'm not sure if this is the best choice: if a bucketed table is
corrupted, should we read the corrupt file? it will likely lead to incorrect
results. On the other hand we can choose to ignore the file which seems to be
more aligned with the name of the config, although result could still be
incorrect.
Note by default the exception will be thrown here and query will be failed
loud. We allow a config here to help existing users to work around if they
want. See relevant discussion in
https://github.com/apache/spark/pull/31413#discussion_r567623746 .
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ lazy val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
Review comment:
Changed to `shouldProcess` as I feel `shouldNotPrune` is hard to reason
about.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ lazy val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) =>
+ filePath => {
+ BucketingUtils.getBucketId(filePath.getName) match {
+ case Some(id) => bucketSet.get(id)
+ case None =>
+ if (ignoreCorruptFiles) {
+ // If ignoring corrupt file, do not prune when bucket file
name is invalid
Review comment:
I feel either skipping or processing the file is no way perfect. There
can be other corruption case, where e.g. the table (specified with 1024
buckets), but only had 500 files underneath. This could be due to some other
compute engines or users accidentally dump data here without respecting spark
bucketing metadata. We have no efficient way to handle if number of files fewer
than number of buckets.
The existing usage of `ignoreCorruptFiles` [skip reading some of content of
file](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala#L160),
so it's also not completely ignoring. But I am fine if we think we need
another config name for this.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ lazy val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) =>
+ filePath => {
+ BucketingUtils.getBucketId(filePath.getName) match {
+ case Some(id) => bucketSet.get(id)
+ case None =>
+ if (ignoreCorruptFiles) {
+ // If ignoring corrupt file, do not prune when bucket file
name is invalid
Review comment:
Given users explicitly disable bucketing here for reading the table, I
would assume they want to read the table as a non-bucketed table, so they would
like to read all of input files, no? cc @viirya what's the use case you are
thinking here? Thanks.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ lazy val ignoreCorruptFiles =
fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
+ val canPrune: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) =>
+ filePath => {
+ BucketingUtils.getBucketId(filePath.getName) match {
+ case Some(id) => bucketSet.get(id)
+ case None =>
+ if (ignoreCorruptFiles) {
+ // If ignoring corrupt file, do not prune when bucket file
name is invalid
Review comment:
@cloud-fan - sorry which part you are suggesting to do in a followup PR?
Here we anyway need to decide how do we handle when file name is not a valid
bucket file name (process or not process the file) for pruning. Does I miss
anything?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]