[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve closed the pull request at: https://github.com/apache/spark/pull/14926 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79198768 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Return whether the request is acknowledged. Ideally we should be returning the list of + * executors which were removed as the requested executors and the one's actually removed can be + * different (CoarseGrainedSchedulerBackend can filter some executors). To avoid breaking the API + * we continue to return a Boolean. + */ + private def removeExecutors(executors: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = new ArrayBuffer[String] + +logInfo("Request to remove executorIds: " + executors.mkString(", ")) +val numExistingExecutors = executorIds.size - executorsPendingToRemove.size +for(executorId <- executors) { + // Do not kill the executor if we have already reached the lower bound + val newExecutorTotal = numExistingExecutors - executorIdsToBeRemoved.size + if (newExecutorTotal - 1 < minNumExecutors) { +logDebug(s"Not removing idle executor $executorId because there are only " + + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") + } else if (canBeKilled(executorId)) { +executorIdsToBeRemoved += executorId + } +} + +if (executorIdsToBeRemoved.isEmpty) { + return false +} + +// Send a request to the backend to kill this executor(s) +val executorsRemoved = if (testing) { + executorIdsToBeRemoved +} else { + client.killExecutors(executorIdsToBeRemoved) +} + +if (testing || executorsRemoved.nonEmpty) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 + for(index <- 0 until executorsRemoved.size) { +val removedExecutorId = executorsRemoved(index) +val newExecutorTotal = numExistingExecutors - (index + 1) +logInfo(s"Removing executor $removedExecutorId because it has been idle for " + + s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") +executorsPendingToRemove.add(removedExecutorId) + } + true +} else { + logWarning(s"Unable to reach the cluster manager to kill executor/s " + +executorIdsToBeRemoved.mkString(",") + "or no executor eligible to kill!") --- End diff -- Yeah. That space is important. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79198414 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + logInfo(s"Requesting to kill filtered executor(s) ${executorsToKill.mkString(", ")}") --- End diff -- Its been added to differentiate between what was requested and what's actually being sent. If we drop it, the log statement above will read the same. How about "idle"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79217027 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. --- End diff -- okay. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79196391 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -583,7 +585,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp _ => Future.successful(false) } - adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + + def execKilled(killStatus: Boolean): Future[Seq[String]] = Future.successful( +if (killStatus) { + executorsToKill +} else { + Seq.empty[String] +} + ) + + killResponse.flatMap(flag => execKilled(flag))(ThreadUtils.sameThread) --- End diff -- Agreed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79199040 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Return whether the request is acknowledged. Ideally we should be returning the list of + * executors which were removed as the requested executors and the one's actually removed can be + * different (CoarseGrainedSchedulerBackend can filter some executors). To avoid breaking the API + * we continue to return a Boolean. + */ + private def removeExecutors(executors: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = new ArrayBuffer[String] + +logInfo("Request to remove executorIds: " + executors.mkString(", ")) +val numExistingExecutors = executorIds.size - executorsPendingToRemove.size +for(executorId <- executors) { + // Do not kill the executor if we have already reached the lower bound + val newExecutorTotal = numExistingExecutors - executorIdsToBeRemoved.size + if (newExecutorTotal - 1 < minNumExecutors) { +logDebug(s"Not removing idle executor $executorId because there are only " + + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") + } else if (canBeKilled(executorId)) { +executorIdsToBeRemoved += executorId + } +} + +if (executorIdsToBeRemoved.isEmpty) { + return false +} + +// Send a request to the backend to kill this executor(s) +val executorsRemoved = if (testing) { + executorIdsToBeRemoved +} else { + client.killExecutors(executorIdsToBeRemoved) +} + +if (testing || executorsRemoved.nonEmpty) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 + for(index <- 0 until executorsRemoved.size) { --- End diff -- I would like to keep the numExistingExecutors, its easier to skim through logs for validating sequential release of executors with dynamic allocation enabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79196128 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala --- @@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) + +val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { + case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] + case _ => null +} + executorAllocationManager = ExecutorAllocationManager.createIfEnabled( - ssc.sparkContext, + executorAllocClient, --- End diff -- We are checking that in createIfEnabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79036171 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Return whether the request is acknowledged. Ideally we should be returning the list of + * executors which were removed as the requested executors and the one's actually removed can be + * different (CoarseGrainedSchedulerBackend can filter some executors). To avoid breaking the API + * we continue to return a Boolean. + */ + private def removeExecutors(executors: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = new ArrayBuffer[String] + +logInfo("Request to remove executorIds: " + executors.mkString(", ")) +val numExistingExecutors = executorIds.size - executorsPendingToRemove.size +for(executorId <- executors) { + // Do not kill the executor if we have already reached the lower bound + val newExecutorTotal = numExistingExecutors - executorIdsToBeRemoved.size + if (newExecutorTotal - 1 < minNumExecutors) { +logDebug(s"Not removing idle executor $executorId because there are only " + + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") + } else if (canBeKilled(executorId)) { +executorIdsToBeRemoved += executorId + } +} + +if (executorIdsToBeRemoved.isEmpty) { + return false +} + +// Send a request to the backend to kill this executor(s) +val executorsRemoved = if (testing) { + executorIdsToBeRemoved +} else { + client.killExecutors(executorIdsToBeRemoved) +} + +if (testing || executorsRemoved.nonEmpty) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 + for(index <- 0 until executorsRemoved.size) { +val removedExecutorId = executorsRemoved(index) +val newExecutorTotal = numExistingExecutors - (index + 1) +logInfo(s"Removing executor $removedExecutorId because it has been idle for " + + s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") +executorsPendingToRemove.add(removedExecutorId) + } + true +} else { + logWarning(s"Unable to reach the cluster manager to kill executor/s " + +executorIdsToBeRemoved.mkString(",") + "or no executor eligible to kill!") --- End diff -- You can use interpolation here to; at the very least you need a space before "or", otherwise the message will read funny. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79038316 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -583,7 +585,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp _ => Future.successful(false) } - adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + + def execKilled(killStatus: Boolean): Future[Seq[String]] = Future.successful( +if (killStatus) { + executorsToKill +} else { + Seq.empty[String] +} + ) + + killResponse.flatMap(flag => execKilled(flag))(ThreadUtils.sameThread) --- End diff -- It would be clearer is `execKilled` was inlined here. ``` killResponse.flatMap { success => if (success) list else Nil }(ThreadUtils.sameThread) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79037842 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + logInfo(s"Requesting to kill filtered executor(s) ${executorsToKill.mkString(", ")}") --- End diff -- "filtered" sounds weird... just drop it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79035363 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Return whether the request is acknowledged. Ideally we should be returning the list of + * executors which were removed as the requested executors and the one's actually removed can be + * different (CoarseGrainedSchedulerBackend can filter some executors). To avoid breaking the API + * we continue to return a Boolean. + */ + private def removeExecutors(executors: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = new ArrayBuffer[String] + +logInfo("Request to remove executorIds: " + executors.mkString(", ")) +val numExistingExecutors = executorIds.size - executorsPendingToRemove.size +for(executorId <- executors) { --- End diff -- Please add spaces after all these keywords. You do this in many places in the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79035412 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Return whether the request is acknowledged. Ideally we should be returning the list of + * executors which were removed as the requested executors and the one's actually removed can be + * different (CoarseGrainedSchedulerBackend can filter some executors). To avoid breaking the API + * we continue to return a Boolean. + */ + private def removeExecutors(executors: Seq[String]): Boolean = synchronized { + --- End diff -- nit: no need for this empty line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79037417 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1575,13 +1581,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return whether the request is received. */ private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { -schedulerBackend match { +val killResponse = schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(Seq(executorId), replace = true, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") -false +Seq.empty[String] } +killResponse.nonEmpty --- End diff -- You could move the `nonEmpty` to the `killExecutors` call above and not have to change anything else in this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79036915 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -409,26 +474,7 @@ private[spark] class ExecutorAllocationManager( return false --- End diff -- Drop `return`, use `if () ... else ...` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79038521 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala --- @@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) + +val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { + case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] + case _ => null +} + executorAllocationManager = ExecutorAllocationManager.createIfEnabled( - ssc.sparkContext, + executorAllocClient, --- End diff -- It feels like there should be an assertion somewhere that the client is not `null`. Otherwise `ExecutorAllocationManager` can't work, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79036795 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. + * Return whether the request is acknowledged. Ideally we should be returning the list of + * executors which were removed as the requested executors and the one's actually removed can be + * different (CoarseGrainedSchedulerBackend can filter some executors). To avoid breaking the API + * we continue to return a Boolean. + */ + private def removeExecutors(executors: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = new ArrayBuffer[String] + +logInfo("Request to remove executorIds: " + executors.mkString(", ")) +val numExistingExecutors = executorIds.size - executorsPendingToRemove.size +for(executorId <- executors) { + // Do not kill the executor if we have already reached the lower bound + val newExecutorTotal = numExistingExecutors - executorIdsToBeRemoved.size + if (newExecutorTotal - 1 < minNumExecutors) { +logDebug(s"Not removing idle executor $executorId because there are only " + + s"$numExistingExecutors executor(s) left (limit $minNumExecutors)") + } else if (canBeKilled(executorId)) { +executorIdsToBeRemoved += executorId + } +} + +if (executorIdsToBeRemoved.isEmpty) { + return false +} + +// Send a request to the backend to kill this executor(s) +val executorsRemoved = if (testing) { + executorIdsToBeRemoved +} else { + client.killExecutors(executorIdsToBeRemoved) +} + +if (testing || executorsRemoved.nonEmpty) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 + for(index <- 0 until executorsRemoved.size) { --- End diff -- I kinda dislike `for` in Scala... this looks cleaner to me: ``` var numExistingExecutors = ... executorsRemoved.foreach { id => numExistingExecutors -= 1 logInfo(...) executorsPendingToRemove.add(id) } ``` You could also avoid the `numExistingExecutors` in every log message and just print a separate log message with the final count. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r79035297 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager( } /** + * Request the cluster manager to remove the given executors. --- End diff -- This comment is weird. It's a private method, so there's no API to maintain. The actual API that you're trying to maintain is `removeExecutor` below. You could implement it by returning the list here and returning whether it's empty from that method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r78772541 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2535,7 +2539,7 @@ object SparkContext extends Logging { private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader -val serviceLoaders = +var serviceLoaders = --- End diff -- change back to val --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r78772038 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, - FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} -import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, - TextInputFormat} +import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} --- End diff -- wrap line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r78771555 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend --- End diff -- remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r78106170 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- > Killing any one of them should return true That's the current behavior, so it should be preserved. Yeah it's sketchy but there really isn't a good solution, so go with the current state of things. > Can we change the api to return the list of killed executors It's marked as `@DeveloperApi`, but even then, it's frowned upon to break compatibility for these APIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r78092320 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- @vanzin We can do the following: * Get ```SparkContext``` rid of the ```ExecutorAllocationClient``` * ```SparkContext``` still exposes the killExecutor(s) * Change ```ExecutorAllocationClient``` to return a list of filtered executors which were requested to be killed. The current dev api for ```killExecutors``` returns a boolean. How do we want to interpret this. Because the ```CoarseGrainedSchedulerBackend``` can decide to kill a subset of the requested executors, so how do we interpret this? Killing any one of them should return true or killing all of them? The semantics are kind of hazy here. Can we change the api to return the list of killed executors making it consistent throughout? Or keep the api the same but have not very clear understanding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77724521 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- Was missing the context of SPARK-9552. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77723977 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- ah ok, I see it now, yeah we should fix the api then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77720376 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- The problem is not the backend. The problem is `CoarseGrainedSchedulerBackend.killExecutors` which can decide to kill just a subset of the executors asked to be killed. I'm wary of @dhruve's assessment of the problem because that code was added *exactly* because the code was killing busy executors that had for some reason become "idle". See SPARK-9552. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77719621 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- so I took a quick look at this and I agree with you that ideally its returning ones actually removed, but right now I don't see any of the backends actually returning anything other then true unless its something catastrophic where it wouldn't matter if the executor list is 1 or many. Yarn mode right now always returns true (ApplicationMaster KillExecutors endpoint). Standalone mode only return false if the app doesn't exist so if we pass 1 or many executors doesn't matter, mesos mode only returns false if the mesosdriver is null. So changing the interface to do the right things seems like it can be a separate issue then this one. or perhaps I missed something? But it looks like there are a few cases that mesos and standalone mode do return false. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77684425 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- Sorry, I'm not comfortable with your assumption. Without changing the interface to properly return the executors that were actually removed, this change is too risky. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77684087 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- Redefining the ```ExecutorAllocationClient``` interface would definitely be a more meaningful change where we return the executors which were actually removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77683485 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- As @vanzin mentioned, the ```CoarseGrainedSchedulerBackend``` filters out executors based on whether we want to force kill the executors which are already running and the one's which are already pending to be removed. In our case, we are requesting to kill the executors which have remained idle for the configured timeout duration. AFAIK this does not lead to filtering out any executors unless they are already marked for removal which is harmless. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77680444 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) +if (removeRequestAcknowledged) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 --- End diff -- This was solely to maintain consistency with earlier log message, which I think is useful and more readable rather than scanning the logs separately to check for the removed executor log and the new desired total. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77426133 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- Ah, this is something I've already asked for in the past... `killExecutors` should really return something more interesting than a boolean. Because `CoarseGrainedSchedulerBackend` *will* ignore executors it doesn't want to kill. The problem with that is that through some unfortunate inheritance chain, `SparkContext` actually extends `ExecutorAllocationClient` and thus, `killExecutors` is a public API. I think we should fix this so that: - `SparkContext` still exposes the existing `killExecutors`, but doesn't override `ExecutorAllocationClient` - `ExecutorAllocationClient` defines the proper interface, which optimally would return the list of executors actually removed so that the rest of the code here can do the right thing. Otherwise, there's no way to do what this patch proposes without breaking all the accounting, unfortunately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77309831 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) --- End diff -- Is there a risk that somehow _some_ of the executors are killed and not others? The code kinda assumes now that all or none succeed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77309507 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) +if (removeRequestAcknowledged) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 + for(index <- 0 until executorIdsToBeRemoved.size) { +logInfo(s"Removing executor " + executorIdsToBeRemoved(index) + " because it has been " + --- End diff -- This mixes string interpolation and concatenation. You can rewrite it with interpolation, possibly defining extra vals for clarity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77309440 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager( } /** - * Request the cluster manager to remove the given executor. + * Request the cluster manager to remove the given executors. * Return whether the request is received. */ - private def removeExecutor(executorId: String): Boolean = synchronized { + private def removeExecutors(executorIds: Seq[String]): Boolean = synchronized { + +val executorIdsToBeRemoved = executorIds.filter(canBeKilled) + +// Send a request to the backend to kill this executor +val removeRequestAcknowledged = testing || client.killExecutors(executorIdsToBeRemoved) +if (removeRequestAcknowledged) { + val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size + var index = 0 --- End diff -- why this? the `for` already defines it for you --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77309343 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager( updateAndSyncNumExecutorsTarget(now) +val executorIdsToBeRemoved = new ArrayBuffer[String] removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false -removeExecutor(executorId) +executorIdsToBeRemoved += executorId } !expired } +if(executorIdsToBeRemoved.size != 0) { --- End diff -- `if (executorIdsToBeRemoved.nonEmpty) {` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14926#discussion_r77309320 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager( updateAndSyncNumExecutorsTarget(now) +val executorIdsToBeRemoved = new ArrayBuffer[String] --- End diff -- Probably trivial, but I think just `ArrayBuffer[String]()` is more idiomatic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/14926 [SPARK-17365][Core] Remove/Kill multiple executors together to reduce⦠## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark impr/SPARK-17365 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14926 commit f058ad299fd9a4430da296f637aa539979d6368c Author: Dhruve AsharDate: 2016-09-01T20:19:08Z [SPARK-17365][Core] Remove/Kill multiple executors together to reduce contention --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org