This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new f4291e3 [SPARK-36228][SQL] Skip splitting a skewed partition when some map outputs are removed f4291e3 is described below commit f4291e373ee6e80456a42711072a75659bf1e2b5 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Jul 21 22:17:56 2021 +0800 [SPARK-36228][SQL] Skip splitting a skewed partition when some map outputs are removed ### What changes were proposed in this pull request? Sometimes, AQE skew join optimization can fail with NPE. This is because AQE tries to get the shuffle block sizes, but some map outputs are missing due to the executor lost or something. This PR fixes this bug by skipping skew join handling if some map outputs are missing in the `MapOutputTracker`. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? a new UT Closes #33445 from cloud-fan/bug. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 9c8a3d3975fab1e21d9482ed327919f9904e25df) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/adaptive/ShufflePartitionsUtil.scala | 9 ++++-- .../sql/execution/ShufflePartitionsUtilSuite.scala | 33 ++++++++++++++++++++-- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 837764b..4ef7d33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -362,11 +362,15 @@ object ShufflePartitionsUtil extends Logging { } /** - * Get the map size of the specific reduce shuffle Id. + * Get the map size of the specific shuffle and reduce ID. Note that, some map outputs can be + * missing due to issues like executor lost. The size will be -1 for missing map outputs and the + * caller side should take care of it. */ private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = { val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)} + mapOutputTracker.shuffleStatuses(shuffleId).withMapStatuses(_.map { stat => + if (stat == null) -1 else stat.getSizeForBlock(partitionId) + }) } /** @@ -378,6 +382,7 @@ object ShufflePartitionsUtil extends Logging { reducerId: Int, targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = { val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) + if (mapPartitionSizes.exists(_ < 0)) return None val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, targetSize) if (mapStartIndices.length > 1) { Some(mapStartIndices.indices.map { i => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index a38caa7..9f70c8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.execution -import org.apache.spark.{MapOutputStatistics, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, MapOutputStatistics, MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv, SparkFunSuite} +import org.apache.spark.scheduler.MapStatus import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil +import org.apache.spark.storage.BlockManagerId -class ShufflePartitionsUtilSuite extends SparkFunSuite { +class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext { private def checkEstimation( bytesByPartitionIdArray: Array[Array[Long]], @@ -765,4 +767,31 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { targetSize, 1, 0) assert(coalesced == Seq(expected1, expected2)) } + + test("SPARK-36228: Skip splitting a skewed partition when some map outputs are removed") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]")) + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.registerShuffle(shuffleId = 10, numMaps = 2, numReduces = 1) + mapOutputTracker.registerMapOutput(shuffleId = 10, mapIndex = 0, MapStatus( + BlockManagerId("a", "hostA", port = 1000), + Array(MapStatus.compressSize(10)), + mapTaskId = 5)) + mapOutputTracker.registerMapOutput(shuffleId = 10, mapIndex = 1, MapStatus( + BlockManagerId("b", "hostB", port = 1000), + Array(MapStatus.compressSize(20)), + mapTaskId = 6)) + + val skewPartitionSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs( + shuffleId = 10, reducerId = 0, targetSize = 2) + assert(skewPartitionSpecs.isDefined) + // Returns 2 partition specs because there are 2 mappers. + assert(skewPartitionSpecs.get.size == 2) + + // As if one map output is removed + mapOutputTracker.unregisterMapOutput( + shuffleId = 10, mapIndex = 0, BlockManagerId("a", "hostA", port = 1000)) + val skewPartitionSpecsAfterRemoval = ShufflePartitionsUtil.createSkewPartitionSpecs( + shuffleId = 10, reducerId = 0, targetSize = 2) + assert(skewPartitionSpecsAfterRemoval.isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org