This is an automated email from the ASF dual-hosted git repository. zhenhuawang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 63baffd [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns 63baffd is described below commit 63baffdd255d0c1558ce91c1528b774ec1d35f41 Author: Zhenhua Wang <wzh_...@163.com> AuthorDate: Tue Mar 17 14:20:16 2020 +0800 [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns ### What changes were proposed in this pull request? For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent. ### Why are the changes needed? To fix a bug. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified existing tests. Closes #27924 from wzhfy/inconsistent_rdd_partitioning. Authored-by: Zhenhua Wang <wzh_...@163.com> Signed-off-by: Zhenhua Wang <wzh_...@163.com> (cherry picked from commit 1369a973cdefbab177871124d5ceb2ef55ac136d) Signed-off-by: Zhenhua Wang <wzh_...@163.com> --- .../spark/sql/execution/DataSourceScanExec.scala | 139 +++++++++++---------- .../spark/sql/sources/BucketedReadSuite.scala | 27 ++-- 2 files changed, 87 insertions(+), 79 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index c1f7f0a..8d488d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -253,74 +253,75 @@ case class FileSourceScanExec( partitionFilters.exists(ExecSubqueryExpression.hasSubquery) } - override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { - val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { - relation.bucketSpec + private def toAttribute(colName: String): Option[Attribute] = + output.find(_.name == colName) + + // exposed for testing + lazy val bucketedScan: Boolean = { + if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) { + val spec = relation.bucketSpec.get + val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) + bucketColumns.size == spec.bucketColumnNames.size } else { - None + false } - bucketSpec match { - case Some(spec) => - // For bucketed columns: - // ----------------------- - // `HashPartitioning` would be used only when: - // 1. ALL the bucketing columns are being read from the table - // - // For sorted columns: - // --------------------- - // Sort ordering should be used when ALL these criteria's match: - // 1. `HashPartitioning` is being used - // 2. A prefix (or all) of the sort columns are being read from the table. - // - // Sort ordering would be over the prefix subset of `sort columns` being read - // from the table. - // eg. - // Assume (col0, col2, col3) are the columns read from the table - // If sort columns are (col0, col1), then sort ordering would be considered as (col0) - // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2 - // above - - def toAttribute(colName: String): Option[Attribute] = - output.find(_.name == colName) - - val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) - if (bucketColumns.size == spec.bucketColumnNames.size) { - val partitioning = HashPartitioning(bucketColumns, spec.numBuckets) - val sortColumns = - spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) - val shouldCalculateSortOrder = - conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) && - sortColumns.nonEmpty && - !hasPartitionsAvailableAtRunTime - - val sortOrder = if (shouldCalculateSortOrder) { - // In case of bucketing, its possible to have multiple files belonging to the - // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Current solution is to check if all the buckets have a single file in it - - val files = selectedPartitions.flatMap(partition => partition.files) - val bucketToFilesGrouping = - files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) - val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) - - if (singleFilePartitions) { - // TODO Currently Spark does not support writing columns sorting in descending order - // so using Ascending order. This can be fixed in future - sortColumns.map(attribute => SortOrder(attribute, Ascending)) - } else { - Nil - } - } else { - Nil - } - (partitioning, sortOrder) + } + + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + if (bucketedScan) { + // For bucketed columns: + // ----------------------- + // `HashPartitioning` would be used only when: + // 1. ALL the bucketing columns are being read from the table + // + // For sorted columns: + // --------------------- + // Sort ordering should be used when ALL these criteria's match: + // 1. `HashPartitioning` is being used + // 2. A prefix (or all) of the sort columns are being read from the table. + // + // Sort ordering would be over the prefix subset of `sort columns` being read + // from the table. + // eg. + // Assume (col0, col2, col3) are the columns read from the table + // If sort columns are (col0, col1), then sort ordering would be considered as (col0) + // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2 + // above + val spec = relation.bucketSpec.get + val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) + val partitioning = HashPartitioning(bucketColumns, spec.numBuckets) + val sortColumns = + spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) + val shouldCalculateSortOrder = + conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) && + sortColumns.nonEmpty && + !hasPartitionsAvailableAtRunTime + + val sortOrder = if (shouldCalculateSortOrder) { + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Current solution is to check if all the buckets have a single file in it + + val files = selectedPartitions.flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) + + if (singleFilePartitions) { + // TODO Currently Spark does not support writing columns sorting in descending order + // so using Ascending order. This can be fixed in future + sortColumns.map(attribute => SortOrder(attribute, Ascending)) } else { - (UnknownPartitioning(0), Nil) + Nil } - case _ => - (UnknownPartitioning(0), Nil) + } else { + Nil + } + (partitioning, sortOrder) + } else { + (UnknownPartitioning(0), Nil) } } @@ -393,11 +394,11 @@ case class FileSourceScanExec( options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - val readRDD = relation.bucketSpec match { - case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => - createBucketedReadRDD(bucketing, readFile, dynamicallySelectedPartitions, relation) - case _ => - createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation) + val readRDD = if (bucketedScan) { + createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, + relation) + } else { + createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation) } sendDriverMetrics() readRDD diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 57bbf20..14ba008 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} +import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -100,6 +100,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } + private def getFileScan(plan: SparkPlan): FileSourceScanExec = { + val fileScan = plan.collect { case f: FileSourceScanExec => f } + assert(fileScan.nonEmpty, plan) + fileScan.head + } + // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one @@ -119,8 +125,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { // Filter could hide the bug in bucket pruning. Thus, skipping all the filters val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - val rdd = plan.find(_.isInstanceOf[DataSourceScanExec]) - assert(rdd.isDefined, plan) + val fileScan = getFileScan(plan) // if nothing should be pruned, skip the pruning test if (bucketValues.nonEmpty) { @@ -128,7 +133,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { bucketValues.foreach { value => matchedBuckets.set(BucketingUtils.getBucketIdFromValue(bucketColumn, numBuckets, value)) } - val invalidBuckets = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => + val invalidBuckets = fileScan.execute().mapPartitionsWithIndex { case (index, iter) => // return indexes of partitions that should have been pruned and are not empty if (!matchedBuckets.get(index % numBuckets) && iter.nonEmpty) { Iterator(index) @@ -297,10 +302,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") val plan = bucketedDataFrame.queryExecution.executedPlan - val rdd = plan.find(_.isInstanceOf[DataSourceScanExec]) - assert(rdd.isDefined, plan) + val fileScan = getFileScan(plan) - val emptyBuckets = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => + val emptyBuckets = fileScan.execute().mapPartitionsWithIndex { case (index, iter) => // return indexes of empty partitions if (iter.isEmpty) { Iterator(index) @@ -762,10 +766,13 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j")) + val scanDF = spark.table("bucketed_table").select("j") + assert(!getFileScan(scanDF.queryExecution.executedPlan).bucketedScan) + checkAnswer(scanDF, df1.select("j")) - checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")), - df1.groupBy("j").agg(max("k"))) + val aggDF = spark.table("bucketed_table").groupBy("j").agg(max("k")) + assert(!getFileScan(aggDF.queryExecution.executedPlan).bucketedScan) + checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org