Github user dragos commented on a diff in the pull request:
https://github.com/apache/spark/pull/4984#discussion_r33795284
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
---
@@ -286,4 +324,40 @@ private[spark] class CoarseMesosSchedulerBackend(
super.applicationId
}
+ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ // We don't truly know if we can fulfill the full amount of executors
+ // since at coarse grain it depends on the amount of slaves available.
+ if (executorLimitOption.map(_ != requestedTotal).getOrElse(true)) {
+ logInfo("Capping the total amount of executors to " + requestedTotal)
+ }
+ executorLimitOption = Option(requestedTotal)
+ true
+ }
+
+ override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ if (mesosDriver == null) {
+ logWarning("Asked to kill executors before the executor was
started.")
+ return false
+ }
+
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
+ for (executorId <- executorIds) {
+ val slaveId = executorId.split("/")(0)
+ if (slaveIdToTaskId.contains(slaveId)) {
+ mesosDriver.killTask(
+
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build)
+ pendingRemovedSlaveIds += slaveId
+ } else {
+ logWarning("Unable to find executor Id '" + executorId + "' in
Mesos scheduler")
+ }
+ }
+
+ // We cannot simply decrement from the existing executor limit as we
may not able to
+ // launch as much executors as the limit. But we assume if we are
notified to kill
+ // executors, that means the scheduler wants to set the limit that is
less than
+ // the amount of the executors that has been launched. Therefore, we
take the existing
+ // amount of executors launched and deduct the executors killed as the
new limit.
+ executorLimitOption = Option(Math.max(0, taskIdToSlaveId.size -
pendingRemovedSlaveIds.size))
--- End diff --
The current assumption is that we never launch more than one executor per
slave id. That's enforced in `resourceOffer`, last condition is
`!slaveIdsWithExecutors.contains(slaveId)`. I agree it's worthwhile to relax
that limitation, but let's leave that for another PR and review this one within
the bounds of the current Spark state. I'm the first to acknowledge the need
for a serious refactoring of this code, and I'm willing to do it. But let's
make some baby steps first and get this in. :)
I believe the current logic (again, coming from @tnachen) is making too
many assumptions. In fact, I would prefer to **not** modify `executorLimit` at
all at this point. That's entirely the responsibility of the parent class, that
implements the logic for the whole feature (and works the same in Yarn,
standalone and soon, Mesos).
As a side note, my take is that this implementation should definitely not
get more knowledge about the superclass. It should work entirely within the
defined interface, which is `doRequestTotalExecutors` and `doKillExecutors`.
They are the only (properly documented) hooks for implementors.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]