agrawaldevesh commented on a change in pull request #29468:
URL: https://github.com/apache/spark/pull/29468#discussion_r473172376
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -1062,25 +1062,38 @@ private[spark] class TaskSchedulerImpl(
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] =
synchronized {
- hostToExecutors.get(host).map(_.toSet)
+
hostToExecutors.get(host).map(_.filterNot(isExecutorDecommissioned)).map(_.toSet)
}
def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
- hostToExecutors.contains(host)
+ hostToExecutors.get(host)
+ .exists(executors => executors.exists(e => !isExecutorDecommissioned(e)))
}
def hasHostAliveOnRack(rack: String): Boolean = synchronized {
- hostsByRack.contains(rack)
+ hostsByRack.get(rack)
+ .exists(hosts => hosts.exists(h => !isHostDecommissioned(h)))
}
def isExecutorAlive(execId: String): Boolean = synchronized {
- executorIdToRunningTaskIds.contains(execId)
+ executorIdToRunningTaskIds.contains(execId) &&
!isExecutorDecommissioned(execId)
}
def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}
+ // exposed for test
Review comment:
nit: Should these two methods be declared final : We don't really want
to override them and instead want to override the helper methods it calls.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -1062,25 +1062,38 @@ private[spark] class TaskSchedulerImpl(
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] =
synchronized {
- hostToExecutors.get(host).map(_.toSet)
+
hostToExecutors.get(host).map(_.filterNot(isExecutorDecommissioned)).map(_.toSet)
}
def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
- hostToExecutors.contains(host)
+ hostToExecutors.get(host)
+ .exists(executors => executors.exists(e => !isExecutorDecommissioned(e)))
}
def hasHostAliveOnRack(rack: String): Boolean = synchronized {
- hostsByRack.contains(rack)
+ hostsByRack.get(rack)
+ .exists(hosts => hosts.exists(h => !isHostDecommissioned(h)))
}
def isExecutorAlive(execId: String): Boolean = synchronized {
- executorIdToRunningTaskIds.contains(execId)
+ executorIdToRunningTaskIds.contains(execId) &&
!isExecutorDecommissioned(execId)
}
def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}
+ // exposed for test
+ protected def isExecutorDecommissioned(execId: String): Boolean =
+ getExecutorDecommissionInfo(execId).nonEmpty ||
isHostDecommissioned(executorIdToHost(execId))
Review comment:
Please consider removing `isHostDecommissioned` call here: When a host
is decommissioned, all executors on that host will be marked decommissioned and
would eventually end up in getExecutorDecommissionInfo(execId).nonEmpty.
I would really like the TaskSchedulerImpl to not make its own judgements of
when an executor is decommissioned but rather let that be dictated by the
CoarseGrainedSchedulerBackend/Cluster manager. Otherwise I am afraid that the
two can get out of sync.
I understand the intention but I think it is a bit overzealous -- executors
that are on decommissioned hosts shall eventually be marked as
getExecutorDecommissionInfo.
I tested that your tests still pass even after it is removed.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -653,6 +652,33 @@ class TaskSetManagerSuite
assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined)
}
+ test("SPARK-32653: Decommissioned host/executor should not be used to
calculate locality " +
+ "levels") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc)
+ val backend = mock(classOf[SchedulerBackend])
+ doNothing().when(backend).reviveOffers()
+ sched.initialize(backend)
+
+ val exec0 = "exec0"
+ val exec1 = "exec1"
+ val host0 = "host0"
+ sched.addExecutor(exec0, host0)
+ sched.addExecutor(exec1, host0)
+
+ val taskSet = FakeTask.createTaskSet(2,
+ Seq(TaskLocation(host0, exec0)),
+ Seq(TaskLocation(host0, exec1)))
+ sched.submitTasks(taskSet)
+ val manager = sched.taskSetManagerForAttempt(0, 0).get
+
+ assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY))
+
+ sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", true))
Review comment:
I think you should also call `sched.executorDecommission(exec1,
ExecutorDecommissionInfo("test", true))` (ie for exec1), to mimic the
production behavior which would always decommission all executors on a
decommissioned host.
In addition, I think we should also test decommissioning when the entire
host hasn't been taken down ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]