Github user tnachen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4984#discussion_r30004702
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
---
@@ -281,4 +319,40 @@ private[spark] class CoarseMesosSchedulerBackend(
super.applicationId
}
+ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ // We don't truly know if we can fulfill the full amount of executors
+ // since at coarse grain it depends on the amount of slaves available.
+ logInfo("Capping the total amount of executors to " + requestedTotal)
+ executorLimitOption = Option(requestedTotal)
+ true
+ }
+
+ override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ if (mesosDriver == null) {
+ logWarning("Asked to kill executors before the executor was
started.")
+ return false
+ }
+
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
+ for (executorId <- executorIds) {
+ val slaveId = executorId.split("/")(0)
+ if (slaveIdToTaskId.contains(slaveId)) {
+ mesosDriver.killTask(
+
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build)
+ pendingRemovedSlaveIds += slaveId
+ } else {
+ logWarning("Unable to find executor Id '" + executorId + "' in
Mesos scheduler")
+ }
+ }
+
+ assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size)
--- End diff --
This assert is failing for me when I let the ExecutorAllocationManager
start killing idling executors.
Adding some print lines I realize that the assumption that these two lists
are exclusive is wrong:
15/05/10 18:36:11 WARN CoarseMesosSchedulerBackend: pending:
Set(20150509-065852-3381634314-5050-1960-S2,
20150509-065852-3381634314-5050-1960-S0,
20150509-065852-3381634314-5050-1960-S1)
15/05/10 18:36:11 WARN CoarseMesosSchedulerBackend: current:
{2=20150509-065852-3381634314-5050-1960-S2}
When we kill the Mesos task we add to the pending list, but it's still in
the current list until the task status is returned, so it can easily get into
the situation where pending > current. We should remove this assert, and also
make the executorLimitOption to be:
Option(Math.max(0, taskIdToSlaveId.size - pendingRemovedSlaveIds.size))
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]