This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 39d31dc  [SPARK-32003][CORE][2.4] When external shuffle service is 
used, unregister outputs for executor on fetch failure after executor is lost
39d31dc is described below

commit 39d31dcba362c1c34995d15495d6b3753f8355b5
Author: Wing Yew Poon <wyp...@cloudera.com>
AuthorDate: Tue Aug 4 14:36:59 2020 -0500

    [SPARK-32003][CORE][2.4] When external shuffle service is used, unregister 
outputs for executor on fetch failure after executor is lost
    
    ### What changes were proposed in this pull request?
    
    If an executor is lost, the `DAGScheduler` handles the executor loss by 
removing the executor but does not unregister its outputs if the external 
shuffle service is used. However, if the node on which the executor runs is 
lost, the shuffle service may not be able to serve the shuffle files.
    In such a case, when fetches from the executor's outputs fail in the same 
stage, the `DAGScheduler` again removes the executor and by right, should 
unregister its outputs. It doesn't because the epoch used to track the executor 
failure has not increased.
    
    We track the epoch for failed executors that result in lost file output 
separately, so we can unregister the outputs in this scenario. The idea to 
track a second epoch is due to Attila Zsolt Piros.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test. This test fails without the change and passes with it.
    
    Closes #29182 from wypoon/SPARK-32003-2.4.
    
    Authored-by: Wing Yew Poon <wyp...@cloudera.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 100 +++++++++++++-------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 101 +++++++++++++++++----
 2 files changed, 151 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d0d12d8..18baa0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -171,13 +171,34 @@ private[spark] class DAGScheduler(
    */
   private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
 
-  // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
-  // every task. When we detect a node failing, we note the current epoch 
number and failed
-  // executor, increment it for new tasks, and use this to ignore stray 
ShuffleMapTask results.
-  //
-  // TODO: Garbage collect information about failure epochs when we know there 
are no more
-  //       stray messages to detect.
-  private val failedEpoch = new HashMap[String, Long]
+  /**
+   * Tracks the latest epoch of a fully processed error related to the given 
executor. (We use
+   * the MapOutputTracker's epoch number, which is sent with every task.)
+   *
+   * When an executor fails, it can affect the results of many tasks, and we 
have to deal with
+   * all of them consistently. We don't simply ignore all future results from 
that executor,
+   * as the failures may have been transient; but we also don't want to 
"overreact" to follow-
+   * on errors we receive. Furthermore, we might receive notification of a 
task success, after
+   * we find out the executor has actually failed; we'll assume those 
successes are, in fact,
+   * simply delayed notifications and the results have been lost, if the tasks 
started in the
+   * same or an earlier epoch. In particular, we use this to control when we 
tell the
+   * BlockManagerMaster that the BlockManager has been lost.
+   */
+  private val executorFailureEpoch = new HashMap[String, Long]
+
+  /**
+   * Tracks the latest epoch of a fully processed error where shuffle files 
have been lost from
+   * the given executor.
+   *
+   * This is closely related to executorFailureEpoch. They only differ for the 
executor when
+   * there is an external shuffle service serving shuffle files and we haven't 
been notified that
+   * the entire worker has been lost. In that case, when an executor is lost, 
we do not update
+   * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only 
the executor
+   * fails, we do not unregister the shuffle data as it can still be served; 
but if there is
+   * a failure in the shuffle service (resulting in fetch failure), we 
unregister the shuffle
+   * data only once, even if we get many fetch failures.
+   */
+  private val shuffleFileLostEpoch = new HashMap[String, Long]
 
   private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
 
@@ -1389,7 +1410,8 @@ private[spark] class DAGScheduler(
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
-            if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
+            if (executorFailureEpoch.contains(execId) &&
+                smt.epoch <= executorFailureEpoch(execId)) {
               logInfo(s"Ignoring possibly bogus $smt completion from executor 
$execId")
             } else {
               // The epoch of the task is acceptable (i.e., the task was 
launched after the most
@@ -1725,12 +1747,8 @@ private[spark] class DAGScheduler(
    * modify the scheduler's internal state. Use executorLost() to post a loss 
event from outside.
    *
    * We will also assume that we've lost all shuffle blocks associated with 
the executor if the
-   * executor serves its own blocks (i.e., we're not using external shuffle), 
the entire slave
-   * is lost (likely including the shuffle service), or a FetchFailed 
occurred, in which case we
-   * presume all shuffle data related to this executor to be lost.
-   *
-   * Optionally the epoch during which the failure was caught can be passed to 
avoid allowing
-   * stray fetch failures from possibly retriggering the detection of a node 
as lost.
+   * executor serves its own blocks (i.e., we're not using an external shuffle 
service), or the
+   * entire Standalone worker is lost.
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
@@ -1746,29 +1764,44 @@ private[spark] class DAGScheduler(
       maybeEpoch = None)
   }
 
+  /**
+   * Handles removing an executor from the BlockManagerMaster as well as 
unregistering shuffle
+   * outputs for the executor or optionally its host.
+   *
+   * @param execId executor to be removed
+   * @param fileLost If true, indicates that we assume we've lost all shuffle 
blocks associated
+   *   with the executor; this happens if the executor serves its own blocks 
(i.e., we're not
+   *   using an external shuffle service), the entire Standalone worker is 
lost, or a FetchFailed
+   *   occurred (in which case we presume all shuffle data related to this 
executor to be lost).
+   * @param hostToUnregisterOutputs (optional) executor host if we're 
unregistering all the
+   *   outputs on the host
+   * @param maybeEpoch (optional) the epoch during which the failure was 
caught (this prevents
+   *   reprocessing for follow-on fetch failures)
+   */
   private def removeExecutorAndUnregisterOutputs(
       execId: String,
       fileLost: Boolean,
       hostToUnregisterOutputs: Option[String],
       maybeEpoch: Option[Long] = None): Unit = {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
-    if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
-      failedEpoch(execId) = currentEpoch
-      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
+    logDebug(s"Considering removal of executor $execId; " +
+      s"fileLost: $fileLost, currentEpoch: $currentEpoch")
+    if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) 
< currentEpoch) {
+      executorFailureEpoch(execId) = currentEpoch
+      logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
       blockManagerMaster.removeExecutor(execId)
-      if (fileLost) {
-        hostToUnregisterOutputs match {
-          case Some(host) =>
-            logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, 
currentEpoch))
-            mapOutputTracker.removeOutputsOnHost(host)
-          case None =>
-            logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
-            mapOutputTracker.removeOutputsOnExecutor(execId)
-        }
-        clearCacheLocs()
-
-      } else {
-        logDebug("Additional executor lost message for %s (epoch 
%d)".format(execId, currentEpoch))
+      clearCacheLocs()
+    }
+    if (fileLost &&
+        (!shuffleFileLostEpoch.contains(execId) || 
shuffleFileLostEpoch(execId) < currentEpoch)) {
+      shuffleFileLostEpoch(execId) = currentEpoch
+      hostToUnregisterOutputs match {
+        case Some(host) =>
+          logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
+          mapOutputTracker.removeOutputsOnHost(host)
+        case None =>
+          logInfo(s"Shuffle files lost for executor: $execId (epoch 
$currentEpoch)")
+          mapOutputTracker.removeOutputsOnExecutor(execId)
       }
     }
   }
@@ -1794,11 +1827,12 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleExecutorAdded(execId: String, host: String) {
-    // remove from failedEpoch(execId) ?
-    if (failedEpoch.contains(execId)) {
+    // remove from executorFailureEpoch(execId) ?
+    if (executorFailureEpoch.contains(execId)) {
       logInfo("Host added was in lost list earlier: " + host)
-      failedEpoch -= execId
+      executorFailureEpoch -= execId
     }
+    shuffleFileLostEpoch -= execId
   }
 
   private[scheduler] def handleStageCancellation(stageId: Int, reason: 
Option[String]) {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7732329..562853f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -25,6 +25,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
 
+import org.mockito.Mockito.spy
+import org.mockito.Mockito.times
+import org.mockito.Mockito.verify
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.SpanSugar._
 
@@ -223,6 +226,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 
   var sparkListener: EventInfoRecordingListener = null
 
+  var blockManagerMaster: BlockManagerMaster = null
   var mapOutputTracker: MapOutputTrackerMaster = null
   var broadcastManager: BroadcastManager = null
   var securityMgr: SecurityManager = null
@@ -236,17 +240,18 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
    */
   val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
   // stub out BlockManagerMaster.getLocations to use our cacheLocations
-  val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
-      override def getLocations(blockIds: Array[BlockId]): 
IndexedSeq[Seq[BlockManagerId]] = {
-        blockIds.map {
-          _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => 
cacheLocations.get(key)).
-            getOrElse(Seq())
-        }.toIndexedSeq
-      }
-      override def removeExecutor(execId: String) {
-        // don't need to propagate to the driver, which we don't have
-      }
+  class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, 
conf, true) {
+    override def getLocations(blockIds: Array[BlockId]): 
IndexedSeq[Seq[BlockManagerId]] = {
+      blockIds.map {
+        _.asRDDId.map { id => (id.rddId -> id.splitIndex)
+        }.flatMap { key => cacheLocations.get(key)
+        }.getOrElse(Seq())
+      }.toIndexedSeq
     }
+    override def removeExecutor(execId: String): Unit = {
+      // don't need to propagate to the driver, which we don't have
+    }
+  }
 
   /** The list of results that DAGScheduler has collected. */
   val results = new HashMap[Int, Any]()
@@ -264,6 +269,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     override def jobFailed(exception: Exception): Unit = { failure = exception 
}
   }
 
+  class MyMapOutputTrackerMaster(
+      conf: SparkConf,
+      broadcastManager: BroadcastManager)
+    extends MapOutputTrackerMaster(conf, broadcastManager, true) {
+
+    override def sendTracker(message: Any): Unit = {
+      // no-op, just so we can stop this to avoid leaking threads
+    }
+  }
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     init(new SparkConf())
@@ -280,11 +295,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     results.clear()
     securityMgr = new SecurityManager(conf)
     broadcastManager = new BroadcastManager(true, conf, securityMgr)
-    mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, 
true) {
-      override def sendTracker(message: Any): Unit = {
-        // no-op, just so we can stop this to avoid leaking threads
-      }
-    }
+    mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, 
broadcastManager))
+    blockManagerMaster = spy(new MyBlockManagerMaster(conf))
     scheduler = new DAGScheduler(
       sc,
       taskScheduler,
@@ -520,6 +532,59 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assert(mapStatus2(2).location.host === "hostB")
   }
 
+  test("SPARK-32003: All shuffle files for executor should be cleaned up on 
fetch failure") {
+    // reset the test context with the right shuffle service config
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+    init(conf)
+
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    submit(reduceRdd, Array(0, 1, 2))
+    // Map stage completes successfully,
+    // two tasks are run on an executor on hostA and one on an executor on 
hostB
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 3)),
+      (Success, makeMapStatus("hostA", 3)),
+      (Success, makeMapStatus("hostB", 3))))
+    // Now the executor on hostA is lost
+    runEvent(ExecutorLost("exec-hostA", ExecutorExited(-100, false, "Container 
marked as failed")))
+    // Executor is removed but shuffle files are not unregistered
+    verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
+    verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("exec-hostA")
+
+    // The MapOutputTracker has all the shuffle files
+    val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
+    assert(mapStatuses.count(_ != null) === 3)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == 
"exec-hostA") === 2)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == 
"exec-hostB") === 1)
+
+    // Now a fetch failure from the lost executor occurs
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 
null)
+    ))
+    // blockManagerMaster.removeExecutor is not called again
+    // but shuffle files are unregistered
+    verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
+    verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
+
+    // Shuffle files for exec-hostA should be lost
+    assert(mapStatuses.count(_ != null) === 1)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == 
"exec-hostA") === 0)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == 
"exec-hostB") === 1)
+
+    // Additional fetch failure from the executor does not result in further 
call to
+    // mapOutputTracker.removeOutputsOnExecutor
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 0, "ignored"), 
null)
+    ))
+    verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
+  }
+
   test("zero split job") {
     var numResults = 0
     var failureReason: Option[Exception] = None
@@ -741,8 +806,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     complete(taskSets(1), Seq(
       (Success, 42),
       (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 
null)))
-    // this will get called
-    // blockManagerMaster.removeExecutor("exec-hostA")
+    verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
@@ -785,11 +849,14 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
       runEvent(ExecutorLost("exec-hostA", event))
+      verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
       if (expectFileLoss) {
+        verify(mapOutputTracker, 
times(1)).removeOutputsOnExecutor("exec-hostA")
         intercept[MetadataFetchFailedException] {
           mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
         }
       } else {
+        verify(mapOutputTracker, 
times(0)).removeOutputsOnExecutor("exec-hostA")
         assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
           HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to