tgravescs commented on a change in pull request #29906:
URL: https://github.com/apache/spark/pull/29906#discussion_r508525385
##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -284,80 +284,127 @@ private[spark] class AppStatusListener(
}
override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted):
Unit = {
- updateBlackListStatus(event.executorId, true)
+ updateExclusionStatus(event.executorId, true)
+ }
+
+ override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit
= {
+ updateExclusionStatus(event.executorId, true)
}
override def onExecutorBlacklistedForStage(
event: SparkListenerExecutorBlacklistedForStage): Unit = {
- val now = System.nanoTime()
+ updateExclusionStatusForStage(event.stageId, event.stageAttemptId,
event.executorId)
+ }
- Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach {
stage =>
- setStageBlackListStatus(stage, now, event.executorId)
- }
- liveExecutors.get(event.executorId).foreach { exec =>
- addBlackListedStageTo(exec, event.stageId, now)
- }
+ override def onExecutorExcludedForStage(
+ event: SparkListenerExecutorExcludedForStage): Unit = {
+ updateExclusionStatusForStage(event.stageId, event.stageAttemptId,
event.executorId)
}
override def onNodeBlacklistedForStage(event:
SparkListenerNodeBlacklistedForStage): Unit = {
- val now = System.nanoTime()
+ updateNodeExclusionStatusForStage(event.stageId, event.stageAttemptId,
event.hostId)
+ }
- // Implicitly blacklist every available executor for the stage associated
with this node
- Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach {
stage =>
- val executorIds = liveExecutors.values.filter(_.host ==
event.hostId).map(_.executorId).toSeq
- setStageBlackListStatus(stage, now, executorIds: _*)
- }
- liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
- addBlackListedStageTo(exec, event.stageId, now)
- }
+ override def onNodeExcludedForStage(event:
SparkListenerNodeExcludedForStage): Unit = {
+ updateNodeExclusionStatusForStage(event.stageId, event.stageAttemptId,
event.hostId)
}
- private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now:
Long): Unit = {
- exec.blacklistedInStages += stageId
+ private def addExcludedStageTo(exec: LiveExecutor, stageId: Int, now: Long):
Unit = {
+ exec.excludedInStages += stageId
liveUpdate(exec, now)
}
private def setStageBlackListStatus(stage: LiveStage, now: Long,
executorIds: String*): Unit = {
executorIds.foreach { executorId =>
val executorStageSummary = stage.executorSummary(executorId)
- executorStageSummary.isBlacklisted = true
+ executorStageSummary.isExcluded = true
+ maybeUpdate(executorStageSummary, now)
+ }
+ stage.excludedExecutors ++= executorIds
+ maybeUpdate(stage, now)
+ }
+
+ private def setStageExcludedStatus(stage: LiveStage, now: Long, executorIds:
String*): Unit = {
+ executorIds.foreach { executorId =>
+ val executorStageSummary = stage.executorSummary(executorId)
+ executorStageSummary.isExcluded = true
maybeUpdate(executorStageSummary, now)
}
- stage.blackListedExecutors ++= executorIds
+ stage.excludedExecutors ++= executorIds
maybeUpdate(stage, now)
}
override def onExecutorUnblacklisted(event:
SparkListenerExecutorUnblacklisted): Unit = {
- updateBlackListStatus(event.executorId, false)
+ updateExclusionStatus(event.executorId, false)
+ }
+
+ override def onExecutorUnexcluded(event: SparkListenerExecutorUnexcluded):
Unit = {
+ updateExclusionStatus(event.executorId, false)
}
override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
Review comment:
I left them to be completely backwards compatible. This way they are
still written to event logs which means older history servers should work with
newer files and if people have parsers for those it still works. I don't think
we say the event log if compatible in that way but this gives more
compatibility.
I'm fine with removing them if everyone else thinks its ok.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]