This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f4291e3  [SPARK-36228][SQL] Skip splitting a skewed partition when 
some map outputs are removed
f4291e3 is described below

commit f4291e373ee6e80456a42711072a75659bf1e2b5
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Jul 21 22:17:56 2021 +0800

    [SPARK-36228][SQL] Skip splitting a skewed partition when some map outputs 
are removed
    
    ### What changes were proposed in this pull request?
    
    Sometimes, AQE skew join optimization can fail with NPE. This is because 
AQE tries to get the shuffle block sizes, but some map outputs are missing due 
to the executor lost or something.
    
    This PR fixes this bug by skipping skew join handling if some map outputs 
are missing in the `MapOutputTracker`.
    
    ### Why are the changes needed?
    
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    a new UT
    
    Closes #33445 from cloud-fan/bug.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 9c8a3d3975fab1e21d9482ed327919f9904e25df)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/adaptive/ShufflePartitionsUtil.scala |  9 ++++--
 .../sql/execution/ShufflePartitionsUtilSuite.scala | 33 ++++++++++++++++++++--
 2 files changed, 38 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 837764b..4ef7d33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -362,11 +362,15 @@ object ShufflePartitionsUtil extends Logging {
   }
 
   /**
-   * Get the map size of the specific reduce shuffle Id.
+   * Get the map size of the specific shuffle and reduce ID. Note that, some 
map outputs can be
+   * missing due to issues like executor lost. The size will be -1 for missing 
map outputs and the
+   * caller side should take care of it.
    */
   private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
     val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-    
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+    mapOutputTracker.shuffleStatuses(shuffleId).withMapStatuses(_.map { stat =>
+      if (stat == null) -1 else stat.getSizeForBlock(partitionId)
+    })
   }
 
   /**
@@ -378,6 +382,7 @@ object ShufflePartitionsUtil extends Logging {
       reducerId: Int,
       targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
     val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
+    if (mapPartitionSizes.exists(_ < 0)) return None
     val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, 
targetSize)
     if (mapStartIndices.length > 1) {
       Some(mapStartIndices.indices.map { i =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index a38caa7..9f70c8a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{MapOutputStatistics, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, MapOutputStatistics, 
MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil
+import org.apache.spark.storage.BlockManagerId
 
-class ShufflePartitionsUtilSuite extends SparkFunSuite {
+class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext {
 
   private def checkEstimation(
       bytesByPartitionIdArray: Array[Array[Long]],
@@ -765,4 +767,31 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
       targetSize, 1, 0)
     assert(coalesced == Seq(expected1, expected2))
   }
+
+  test("SPARK-36228: Skip splitting a skewed partition when some map outputs 
are removed") {
+    sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local[2]"))
+    val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+    mapOutputTracker.registerShuffle(shuffleId = 10, numMaps = 2, numReduces = 
1)
+    mapOutputTracker.registerMapOutput(shuffleId = 10, mapIndex = 0, MapStatus(
+      BlockManagerId("a", "hostA", port = 1000),
+      Array(MapStatus.compressSize(10)),
+      mapTaskId = 5))
+    mapOutputTracker.registerMapOutput(shuffleId = 10, mapIndex = 1, MapStatus(
+      BlockManagerId("b", "hostB", port = 1000),
+      Array(MapStatus.compressSize(20)),
+      mapTaskId = 6))
+
+    val skewPartitionSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs(
+      shuffleId = 10, reducerId = 0, targetSize = 2)
+    assert(skewPartitionSpecs.isDefined)
+    // Returns 2 partition specs because there are 2 mappers.
+    assert(skewPartitionSpecs.get.size == 2)
+
+    // As if one map output is removed
+    mapOutputTracker.unregisterMapOutput(
+      shuffleId = 10, mapIndex = 0, BlockManagerId("a", "hostA", port = 1000))
+    val skewPartitionSpecsAfterRemoval = 
ShufflePartitionsUtil.createSkewPartitionSpecs(
+      shuffleId = 10, reducerId = 0, targetSize = 2)
+    assert(skewPartitionSpecsAfterRemoval.isEmpty)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to