Ngone51 commented on a change in pull request #29906:
URL: https://github.com/apache/spark/pull/29906#discussion_r509853462



##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -284,80 +284,127 @@ private[spark] class AppStatusListener(
   }
 
   override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): 
Unit = {
-    updateBlackListStatus(event.executorId, true)
+    updateExclusionStatus(event.executorId, true)
+  }
+
+  override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit 
= {
+    updateExclusionStatus(event.executorId, true)
   }
 
   override def onExecutorBlacklistedForStage(
       event: SparkListenerExecutorBlacklistedForStage): Unit = {
-    val now = System.nanoTime()
+    updateExclusionStatusForStage(event.stageId, event.stageAttemptId, 
event.executorId)
+  }
 
-    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
-      setStageBlackListStatus(stage, now, event.executorId)
-    }
-    liveExecutors.get(event.executorId).foreach { exec =>
-      addBlackListedStageTo(exec, event.stageId, now)
-    }
+  override def onExecutorExcludedForStage(
+      event: SparkListenerExecutorExcludedForStage): Unit = {
+    updateExclusionStatusForStage(event.stageId, event.stageAttemptId, 
event.executorId)
   }
 
   override def onNodeBlacklistedForStage(event: 
SparkListenerNodeBlacklistedForStage): Unit = {
-    val now = System.nanoTime()
+    updateNodeExclusionStatusForStage(event.stageId, event.stageAttemptId, 
event.hostId)
+  }
 
-    // Implicitly blacklist every available executor for the stage associated 
with this node
-    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
-      val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId).toSeq
-      setStageBlackListStatus(stage, now, executorIds: _*)
-    }
-    liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
-      addBlackListedStageTo(exec, event.stageId, now)
-    }
+  override def onNodeExcludedForStage(event: 
SparkListenerNodeExcludedForStage): Unit = {
+    updateNodeExclusionStatusForStage(event.stageId, event.stageAttemptId, 
event.hostId)
   }
 
-  private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: 
Long): Unit = {
-    exec.blacklistedInStages += stageId
+  private def addExcludedStageTo(exec: LiveExecutor, stageId: Int, now: Long): 
Unit = {
+    exec.excludedInStages += stageId
     liveUpdate(exec, now)
   }
 
   private def setStageBlackListStatus(stage: LiveStage, now: Long, 
executorIds: String*): Unit = {
     executorIds.foreach { executorId =>
       val executorStageSummary = stage.executorSummary(executorId)
-      executorStageSummary.isBlacklisted = true
+      executorStageSummary.isExcluded = true
+      maybeUpdate(executorStageSummary, now)
+    }
+    stage.excludedExecutors ++= executorIds
+    maybeUpdate(stage, now)
+  }
+
+  private def setStageExcludedStatus(stage: LiveStage, now: Long, executorIds: 
String*): Unit = {
+    executorIds.foreach { executorId =>
+      val executorStageSummary = stage.executorSummary(executorId)
+      executorStageSummary.isExcluded = true
       maybeUpdate(executorStageSummary, now)
     }
-    stage.blackListedExecutors ++= executorIds
+    stage.excludedExecutors ++= executorIds
     maybeUpdate(stage, now)
   }
 
   override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
-    updateBlackListStatus(event.executorId, false)
+    updateExclusionStatus(event.executorId, false)
+  }
+
+  override def onExecutorUnexcluded(event: SparkListenerExecutorUnexcluded): 
Unit = {
+    updateExclusionStatus(event.executorId, false)
   }
 
   override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {

Review comment:
       IIUC, this is for new HS(history server) + old App compatibility? When a 
new HS reads old blacklist events for the old App, it can thus call these 
deprecated methods.
   
   For old HS + new App case, I think it's fine since we always post new and 
old blacklist events, and thus EventLoggingListers writes two blacklist events 
as well. Therefore, old HS can parse the old blocklist events and ignore the 
new one.
   
   
   I'm fine to leave them as they are. Maybe, add one more comment to explain 
why we still need it?
   
   Another thought is, we can remove all those deprecated methods both in 
`AppStatusListener` and `EventLoggingListers` but keep compatibility by 
handling it in `onOtherEvent`. It's up to you. I'm fine either way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to