squito commented on a change in pull request #28848:
URL: https://github.com/apache/spark/pull/28848#discussion_r445681220



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -177,6 +177,8 @@ private[spark] class DAGScheduler(
   // TODO: Garbage collect information about failure epochs when we know there 
are no more
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]

Review comment:
       I suggest we rename this to `blockManagerFailureEpoch`, or perhaps 
`executorFailureEpoch` and change the comment significantly, to something like
   
   Tracks the latest epoch of a fully processed error related to the given 
executor.
   
   When an executor fails, it will affect the results of many tasks, and we 
have to deal with all of them consistently.  We don't simply ignore all future 
results from that executor, as the failures may have been transient; but we 
also don't want to "over-react" to the many follow-on errors we'll receive.  
Furthermore, we might receive notification of a task success, after we find out 
the executor has actually failed; we'll assume those successes are, in fact, 
simply delayed notifications and the results have been lost, if they come from 
the same epoch.  In particular, we use this to control when we tell the 
blockManagerMaster that the blockManager has been lost.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1939,24 +1941,24 @@ private[spark] class DAGScheduler(
       hostToUnregisterOutputs: Option[String],
       maybeEpoch: Option[Long] = None): Unit = {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
+    logDebug(s"Considering removal of executor $execId; " +
+      s"fileLost: $fileLost, currentEpoch: $currentEpoch")
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
       failedEpoch(execId) = currentEpoch
-      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
+      logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
       blockManagerMaster.removeExecutor(execId)
-      if (fileLost) {
-        hostToUnregisterOutputs match {
-          case Some(host) =>
-            logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, 
currentEpoch))
-            mapOutputTracker.removeOutputsOnHost(host)
-          case None =>
-            logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
-            mapOutputTracker.removeOutputsOnExecutor(execId)
-        }
-        clearCacheLocs()
-
-      } else {
-        logDebug("Additional executor lost message for %s (epoch 
%d)".format(execId, currentEpoch))
+    }
+    if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) 
< currentEpoch)) {
+      fileLostEpoch(execId) = currentEpoch
+      hostToUnregisterOutputs match {
+        case Some(host) =>
+          logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
+          mapOutputTracker.removeOutputsOnHost(host)
+        case None =>
+          logInfo(s"Shuffle files lost for executor: $execId (epoch 
$currentEpoch)")
+          mapOutputTracker.removeOutputsOnExecutor(execId)
       }
+      clearCacheLocs()

Review comment:
       shouldn't this be within the `failedEpoch(execId) < currentEpoch` set 
above?  This is about cached RDD blocks, which has to do with the blockManager; 
not the shuffle files.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -177,6 +177,8 @@ private[spark] class DAGScheduler(
   // TODO: Garbage collect information about failure epochs when we know there 
are no more
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
+  // In addition, track epoch for failed executors that result in lost file 
output

Review comment:
       how about `shuffleFileLostEpoch` here, and a comment building on the 
suggested comment above:
   
   Tracks the latest epoch of a fully processed error which indicated shuffle 
files have been lost from the given executor.
   
   This is closely related to `executorFailureEpoch`.  In fact, anytime an 
entry is added here, it must also be added to `executorFailureEpoch`, so 
`shuffleFileLostEpoch(exec) <= executorFailureEpoch(exec)`.  These only differ 
when the external shuffle service is enabled.  In that case, when an executor 
is lost, we do *not* update the epoch here; we wait for a fetch failure.  This 
way, if only the executor fails, we do not remove the shuffle data as it can 
still be served by the shuffle service; but if there is a failure in the 
shuffle service, we remove the shuffle data only once, though we are likely to 
get many failures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to