attilapiros commented on a change in pull request #28848:
URL: https://github.com/apache/spark/pull/28848#discussion_r446034258
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -170,13 +170,29 @@ private[spark] class DAGScheduler(
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
- // For tracking failed nodes, we use the MapOutputTracker's epoch number,
which is sent with
- // every task. When we detect a node failing, we note the current epoch
number and failed
- // executor, increment it for new tasks, and use this to ignore stray
ShuffleMapTask results.
+ // Tracks the latest epoch of a fully processed error related to the given
executor. (We use
+ // the MapOutputTracker's epoch number, which is sent with every task.)
//
- // TODO: Garbage collect information about failure epochs when we know there
are no more
- // stray messages to detect.
- private val failedEpoch = new HashMap[String, Long]
+ // When an executor fails, it can affect the results of many tasks, and we
have to deal with
+ // all of them consistently. We don't simply ignore all future results from
that executor,
+ // as the failures may have been transient; but we also don't want to
"overreact" to follow-
+ // on errors we receive. Furthermore, we might receive notification of a
task success, after
+ // we find out the executor has actually failed; we'll assume those
successes are, in fact,
+ // simply delayed notifications and the results have been lost, if they come
from the same
+ // epoch. In particular, we use this to control when we tell the
BlockManagerMaster that the
+ // BlockManager has been lost.
+ private val executorFailureEpoch = new HashMap[String, Long]
+ // Tracks the latest epoch of a fully processed error where shuffle files
have been lost from
Review comment:
Nit: add empty line to separate this two
```suggestion
// Tracks the latest epoch of a fully processed error where shuffle files
have been lost from
```
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -170,13 +170,29 @@ private[spark] class DAGScheduler(
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
- // For tracking failed nodes, we use the MapOutputTracker's epoch number,
which is sent with
- // every task. When we detect a node failing, we note the current epoch
number and failed
- // executor, increment it for new tasks, and use this to ignore stray
ShuffleMapTask results.
+ // Tracks the latest epoch of a fully processed error related to the given
executor. (We use
+ // the MapOutputTracker's epoch number, which is sent with every task.)
//
- // TODO: Garbage collect information about failure epochs when we know there
are no more
- // stray messages to detect.
- private val failedEpoch = new HashMap[String, Long]
+ // When an executor fails, it can affect the results of many tasks, and we
have to deal with
+ // all of them consistently. We don't simply ignore all future results from
that executor,
+ // as the failures may have been transient; but we also don't want to
"overreact" to follow-
+ // on errors we receive. Furthermore, we might receive notification of a
task success, after
+ // we find out the executor has actually failed; we'll assume those
successes are, in fact,
+ // simply delayed notifications and the results have been lost, if they come
from the same
+ // epoch. In particular, we use this to control when we tell the
BlockManagerMaster that the
+ // BlockManager has been lost.
Review comment:
As tasks started for an earlier epoch are also considered to be lost:
https://github.com/apache/spark/blob/d09ef9335e5d3657b830497155abb7a0c2bb0cde/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1585-L1586
Please mention that case too, like:
```suggestion
// simply delayed notifications and the results have been lost, if those
tasks started for an
// earlier or the same epoch. In particular, we use this to control when
we tell the
// BlockManagerMaster that the BlockManager has been lost.
```
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -764,6 +825,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0,
"ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("hostA-exec")
Review comment:
This comment is not needed as ensured by the `verify`.
```suggestion
```
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1933,29 +1946,43 @@ private[spark] class DAGScheduler(
maybeEpoch = None)
}
+ /**
+ * Handles removing an executor from the BlockManagerMaster as well as
unregistering shuffle
+ * outputs for the executor or optionally its host.
+ *
+ * The fileLost parameter being true indicates that we assume we've lost all
shuffle blocks
Review comment:
Nit: what about documenting the params (at least the `fileLost` and
`maybeEpoch`) with a "@param" scaladoc annotation?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]