This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cd6a463 [SPARK-35888][SQL][FOLLOWUP] Return partition specs for all the shuffles cd6a463 is described below commit cd6a4638110ef3f0db8b6366be680870dfb0bcad Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Jul 1 01:43:11 2021 +0000 [SPARK-35888][SQL][FOLLOWUP] Return partition specs for all the shuffles ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/33079, to fix a bug in corner cases: `ShufflePartitionsUtil.coalescePartitions` should either return the shuffle spec for all the shuffles, or none. If the input RDD has no partition, the `mapOutputStatistics` is None, and we should still return shuffle specs with size 0. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? a new test Closes #33158 from cloud-fan/bug. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/adaptive/ShufflePartitionsUtil.scala | 43 ++++++++++++---------- .../adaptive/AdaptiveQueryExecSuite.scala | 24 ++++++++++++ 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index a1f2d91..1353dc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -96,9 +96,8 @@ object ShufflePartitionsUtil extends Logging { val numPartitions = validMetrics.head.bytesByPartitionId.length val newPartitionSpecs = coalescePartitions(0, numPartitions, validMetrics, targetSize) - assert(newPartitionSpecs.length == validMetrics.length) - if (newPartitionSpecs.head.length < numPartitions) { - newPartitionSpecs + if (newPartitionSpecs.length < numPartitions) { + attachDataSize(mapOutputStatistics, newPartitionSpecs) } else { Seq.empty } @@ -148,7 +147,8 @@ object ShufflePartitionsUtil extends Logging { if (i - 1 > start) { val partitionSpecs = coalescePartitions( partitionIndices(start), repeatValue, validMetrics, targetSize, true) - newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2) + newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs)) + .foreach(spec => spec._1 ++= spec._2) } // find the end of this skew section, skipping partition(i - 1) and partition(i). var repeatIndex = i + 1 @@ -173,7 +173,8 @@ object ShufflePartitionsUtil extends Logging { if (numPartitions > start) { val partitionSpecs = coalescePartitions( partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize, true) - newPartitionSpecsSeq.zip(partitionSpecs).foreach(spec => spec._1 ++= spec._2) + newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs)) + .foreach(spec => spec._1 ++= spec._2) } // only return coalesced result if any coalescing has happened. if (newPartitionSpecsSeq.head.length < numPartitions) { @@ -204,19 +205,17 @@ object ShufflePartitionsUtil extends Logging { * - coalesced partition 2: shuffle partition 2 (size 170 MiB) * - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB) * - * @return A sequence of sequence of [[CoalescedPartitionSpec]]s. which each inner sequence as - * the new partition specs for its corresponding shuffle after coalescing. For example, - * if partitions [0, 1, 2, 3, 4] and partition bytes [10, 10, 100, 10, 20] with - * targetSize 100, split at indices [0, 2, 3], the returned partition specs will be: - * CoalescedPartitionSpec(0, 2, 20), CoalescedPartitionSpec(2, 3, 100) and - * CoalescedPartitionSpec(3, 5, 30). + * @return A sequence of [[CoalescedPartitionSpec]]s. For example, if partitions [0, 1, 2, 3, 4] + * split at indices [0, 2, 3], the returned partition specs will be: + * CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and + * CoalescedPartitionSpec(3, 5). */ private def coalescePartitions( start: Int, end: Int, mapOutputStatistics: Seq[MapOutputStatistics], targetSize: Long, - allowReturnEmpty: Boolean = false): Seq[Seq[CoalescedPartitionSpec]] = { + allowReturnEmpty: Boolean = false): Seq[CoalescedPartitionSpec] = { val partitionSpecs = ArrayBuffer.empty[CoalescedPartitionSpec] var coalescedSize = 0L var i = start @@ -252,14 +251,20 @@ object ShufflePartitionsUtil extends Logging { } // If do not allowReturnEmpty, create at least one partition if all partitions are empty. createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty) + partitionSpecs.toSeq + } - // add data size for each partitionSpecs - mapOutputStatistics.map { mapStats => - partitionSpecs.map { spec => - val dataSize = spec.startReducerIndex.until(spec.endReducerIndex) - .map(mapStats.bytesByPartitionId).sum - spec.copy(dataSize = Some(dataSize)) - }.toSeq + private def attachDataSize( + mapOutputStatistics: Seq[Option[MapOutputStatistics]], + partitionSpecs: Seq[CoalescedPartitionSpec]): Seq[Seq[CoalescedPartitionSpec]] = { + mapOutputStatistics.map { + case Some(mapStats) => + partitionSpecs.map { spec => + val dataSize = spec.startReducerIndex.until(spec.endReducerIndex) + .map(mapStats.bytesByPartitionId).sum + spec.copy(dataSize = Some(dataSize)) + }.toSeq + case None => partitionSpecs.map(_.copy(dataSize = Some(0))).toSeq }.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index e1fa855..2343a92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1861,4 +1861,28 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-35888: join with a 0-partition table") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { + withTempView("t2") { + // create a temp view with 0 partition + spark.createDataFrame(sparkContext.emptyRDD[Row], new StructType().add("b", IntegerType)) + .createOrReplaceTempView("t2") + val (_, adaptive) = + runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b") + val customReaders = collect(adaptive) { + case c: CustomShuffleReaderExec => c + } + assert(customReaders.length == 2) + customReaders.foreach { c => + val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics + assert(stats.sizeInBytes >= 0) + assert(stats.rowCount.get >= 0) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org