attilapiros commented on a change in pull request #28848:
URL: https://github.com/apache/spark/pull/28848#discussion_r446034258



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -170,13 +170,29 @@ private[spark] class DAGScheduler(
    */
   private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
 
-  // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
-  // every task. When we detect a node failing, we note the current epoch 
number and failed
-  // executor, increment it for new tasks, and use this to ignore stray 
ShuffleMapTask results.
+  // Tracks the latest epoch of a fully processed error related to the given 
executor. (We use
+  // the MapOutputTracker's epoch number, which is sent with every task.)
   //
-  // TODO: Garbage collect information about failure epochs when we know there 
are no more
-  //       stray messages to detect.
-  private val failedEpoch = new HashMap[String, Long]
+  // When an executor fails, it can affect the results of many tasks, and we 
have to deal with
+  // all of them consistently. We don't simply ignore all future results from 
that executor,
+  // as the failures may have been transient; but we also don't want to 
"overreact" to follow-
+  // on errors we receive. Furthermore, we might receive notification of a 
task success, after
+  // we find out the executor has actually failed; we'll assume those 
successes are, in fact,
+  // simply delayed notifications and the results have been lost, if they come 
from the same
+  // epoch. In particular, we use this to control when we tell the 
BlockManagerMaster that the
+  // BlockManager has been lost.
+  private val executorFailureEpoch = new HashMap[String, Long]
+  // Tracks the latest epoch of a fully processed error where shuffle files 
have been lost from

Review comment:
       Nit: add empty line to separate this two
   ```suggestion
   
     // Tracks the latest epoch of a fully processed error where shuffle files 
have been lost from
   ```

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -170,13 +170,29 @@ private[spark] class DAGScheduler(
    */
   private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
 
-  // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
-  // every task. When we detect a node failing, we note the current epoch 
number and failed
-  // executor, increment it for new tasks, and use this to ignore stray 
ShuffleMapTask results.
+  // Tracks the latest epoch of a fully processed error related to the given 
executor. (We use
+  // the MapOutputTracker's epoch number, which is sent with every task.)
   //
-  // TODO: Garbage collect information about failure epochs when we know there 
are no more
-  //       stray messages to detect.
-  private val failedEpoch = new HashMap[String, Long]
+  // When an executor fails, it can affect the results of many tasks, and we 
have to deal with
+  // all of them consistently. We don't simply ignore all future results from 
that executor,
+  // as the failures may have been transient; but we also don't want to 
"overreact" to follow-
+  // on errors we receive. Furthermore, we might receive notification of a 
task success, after
+  // we find out the executor has actually failed; we'll assume those 
successes are, in fact,
+  // simply delayed notifications and the results have been lost, if they come 
from the same
+  // epoch. In particular, we use this to control when we tell the 
BlockManagerMaster that the
+  // BlockManager has been lost.

Review comment:
       As tasks started for an earlier epoch are also considered to be lost:
   
   
https://github.com/apache/spark/blob/d09ef9335e5d3657b830497155abb7a0c2bb0cde/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1585-L1586
   
   Please mention that case too, like:
   
   ```suggestion
     // simply delayed notifications and the results have been lost, if those 
tasks started for an
     // earlier or the same epoch. In particular, we use this to control when 
we tell the 
     // BlockManagerMaster that the BlockManager has been lost.
   ```

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -764,6 +825,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
       (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, 
"ignored"), null)))
     // this will get called
     // blockManagerMaster.removeExecutor("hostA-exec")

Review comment:
       This comment is not needed as ensured by the `verify`.
   ```suggestion
   ```

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1933,29 +1946,43 @@ private[spark] class DAGScheduler(
       maybeEpoch = None)
   }
 
+  /**
+   * Handles removing an executor from the BlockManagerMaster as well as 
unregistering shuffle
+   * outputs for the executor or optionally its host.
+   *
+   * The fileLost parameter being true indicates that we assume we've lost all 
shuffle blocks

Review comment:
       Nit: what about documenting the params (at least the `fileLost` and 
`maybeEpoch`) with a "@param" scaladoc annotation?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to