Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19041#discussion_r143816598
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager(
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
- private def removeExecutors(executors: Seq[String]): Seq[String] =
synchronized {
- val executorIdsToBeRemoved = new ArrayBuffer[String]
-
+ private def removeExecutors(executors: Seq[String]): Unit = synchronized
{
logInfo("Request to remove executorIds: " + executors.mkString(", "))
- val numExistingExecutors = allocationManager.executorIds.size -
executorsPendingToRemove.size
-
- var newExecutorTotal = numExistingExecutors
- executors.foreach { executorIdToBeRemoved =>
- if (newExecutorTotal - 1 < minNumExecutors) {
- logDebug(s"Not removing idle executor $executorIdToBeRemoved
because there are only " +
- s"$newExecutorTotal executor(s) left (minimum number of executor
limit $minNumExecutors)")
- } else if (newExecutorTotal - 1 < numExecutorsTarget) {
- logDebug(s"Not removing idle executor $executorIdToBeRemoved
because there are only " +
- s"$newExecutorTotal executor(s) left (number of executor target
$numExecutorsTarget)")
- } else if (canBeKilled(executorIdToBeRemoved)) {
- executorIdsToBeRemoved += executorIdToBeRemoved
- newExecutorTotal -= 1
- }
- }
+ val numExistingExecs = allocationManager.executorIds.size -
executorsPendingToRemove.size
+ val execCountFloor = math.max(minNumExecutors, numExecutorsTarget)
+ val (executorIdsToBeRemoved, dontRemove) = executors
+ .filter(canBeKilled)
+ .splitAt(numExistingExecs - execCountFloor)
- if (executorIdsToBeRemoved.isEmpty) {
- return Seq.empty[String]
+ dontRemove.foreach { execId =>
+ logDebug(s"Not removing idle executor $execId because it " +
+ s"would put us below the minimum limit of $minNumExecutors
executors" +
+ s"or number of target executors $numExecutorsTarget")
}
- // Send a request to the backend to kill this executor(s)
- val executorsRemoved = if (testing) {
- executorIdsToBeRemoved
+ if (executorIdsToBeRemoved.isEmpty) {
+ Seq.empty[String]
+ } else if (testing) {
+ recordExecutorKill(executorIdsToBeRemoved)
+ } else if (recoverCachedData) {
+ logDebug(s"Starting replicate process for $executorIdsToBeRemoved")
+ client.markPendingToRemove(executorIdsToBeRemoved)
+ recordExecutorKill(executorIdsToBeRemoved)
+ cacheRecoveryManager.startExecutorKill(executorIdsToBeRemoved)
} else {
- client.killExecutors(executorIdsToBeRemoved)
+ val killed = killExecutors(executorIdsToBeRemoved)
+ recordExecutorKill(killed)
}
- // [SPARK-21834] killExecutors api reduces the target number of
executors.
- // So we need to update the target with desired value.
- client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount)
- // reset the newExecutorTotal to the existing number of executors
- newExecutorTotal = numExistingExecutors
- if (testing || executorsRemoved.nonEmpty) {
- executorsRemoved.foreach { removedExecutorId =>
- newExecutorTotal -= 1
- logInfo(s"Removing executor $removedExecutorId because it has been
idle for " +
- s"$executorIdleTimeoutS seconds (new desired total will be
$newExecutorTotal)")
- executorsPendingToRemove.add(removedExecutorId)
- }
- executorsRemoved
- } else {
+ }
+
+ def killExecutors(executorIds: Seq[String], forceIfPending: Boolean =
false): Seq[String] = {
+ logDebug(s"Starting kill process for $executorIds")
+ val result = client.killExecutors(executorIds, forceIfPending =
forceIfPending)
+ if (result.isEmpty) {
logWarning(s"Unable to reach the cluster manager to kill executor/s
" +
- s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible
to kill!")
- Seq.empty[String]
+ s"${executorIds.mkString(",")} or no executor eligible to kill!")
}
+ result
}
- /**
- * Request the cluster manager to remove the given executor.
- * Return whether the request is acknowledged.
- */
- private def removeExecutor(executorId: String): Boolean = synchronized {
- val executorsRemoved = removeExecutors(Seq(executorId))
- executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+ private def recordExecutorKill(executorsRemoved: Seq[String]): Unit =
synchronized {
+ // [SPARK-21834] killExecutors api reduces the target number of
executors.
+ // So we need to update the target with desired value.
+ client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount)
+ executorsPendingToRemove ++= executorsRemoved
+ logInfo(s"Removing executor $executorsRemoved because it has been idle
for " +
--- End diff --
message looks off when you have multiple executors.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]