[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-19 Thread dhruve
Github user dhruve closed the pull request at:

https://github.com/apache/spark/pull/14926


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79198768
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Return whether the request is acknowledged. Ideally we should be 
returning the list of
+   * executors which were removed as the requested executors and the one's 
actually removed can be
+   * different (CoarseGrainedSchedulerBackend can filter some executors). 
To avoid breaking the API
+   * we continue to return a Boolean.
+   */
+  private def removeExecutors(executors: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+logInfo("Request to remove executorIds: " + executors.mkString(", "))
+val numExistingExecutors = executorIds.size - 
executorsPendingToRemove.size
+for(executorId <- executors) {
+  // Do not kill the executor if we have already reached the lower 
bound
+  val newExecutorTotal = numExistingExecutors - 
executorIdsToBeRemoved.size
+  if (newExecutorTotal - 1 < minNumExecutors) {
+logDebug(s"Not removing idle executor $executorId because there 
are only " +
+  s"$numExistingExecutors executor(s) left (limit 
$minNumExecutors)")
+  } else if (canBeKilled(executorId)) {
+executorIdsToBeRemoved += executorId
+  }
+}
+
+if (executorIdsToBeRemoved.isEmpty) {
+  return false
+}
+
+// Send a request to the backend to kill this executor(s)
+val executorsRemoved = if (testing) {
+  executorIdsToBeRemoved
+} else {
+  client.killExecutors(executorIdsToBeRemoved)
+}
+
+if (testing || executorsRemoved.nonEmpty) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
+  for(index <- 0 until executorsRemoved.size) {
+val removedExecutorId = executorsRemoved(index)
+val newExecutorTotal = numExistingExecutors - (index + 1)
+logInfo(s"Removing executor $removedExecutorId because it has been 
idle for " +
+  s"$executorIdleTimeoutS seconds (new desired total will be 
$newExecutorTotal)")
+executorsPendingToRemove.add(removedExecutorId)
+  }
+  true
+} else {
+  logWarning(s"Unable to reach the cluster manager to kill executor/s 
" +
+executorIdsToBeRemoved.mkString(",") + "or no executor eligible to 
kill!")
--- End diff --

Yeah. That space is important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79198414
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 .filter { id => force || !scheduler.isExecutorBusy(id) }
   executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
 
+  logInfo(s"Requesting to kill filtered executor(s) 
${executorsToKill.mkString(", ")}")
--- End diff --

Its been added to differentiate between what was requested and what's 
actually being sent. If we drop it, the log statement above will read the same. 
How about "idle"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79217027
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
--- End diff --

okay. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79196391
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -583,7 +585,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   _ => Future.successful(false)
 }
 
-  adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+  val killResponse = 
adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+
+  def execKilled(killStatus: Boolean): Future[Seq[String]] = 
Future.successful(
+if (killStatus) {
+  executorsToKill
+} else {
+  Seq.empty[String]
+}
+  )
+
+  killResponse.flatMap(flag => 
execKilled(flag))(ThreadUtils.sameThread)
--- End diff --

Agreed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79199040
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Return whether the request is acknowledged. Ideally we should be 
returning the list of
+   * executors which were removed as the requested executors and the one's 
actually removed can be
+   * different (CoarseGrainedSchedulerBackend can filter some executors). 
To avoid breaking the API
+   * we continue to return a Boolean.
+   */
+  private def removeExecutors(executors: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+logInfo("Request to remove executorIds: " + executors.mkString(", "))
+val numExistingExecutors = executorIds.size - 
executorsPendingToRemove.size
+for(executorId <- executors) {
+  // Do not kill the executor if we have already reached the lower 
bound
+  val newExecutorTotal = numExistingExecutors - 
executorIdsToBeRemoved.size
+  if (newExecutorTotal - 1 < minNumExecutors) {
+logDebug(s"Not removing idle executor $executorId because there 
are only " +
+  s"$numExistingExecutors executor(s) left (limit 
$minNumExecutors)")
+  } else if (canBeKilled(executorId)) {
+executorIdsToBeRemoved += executorId
+  }
+}
+
+if (executorIdsToBeRemoved.isEmpty) {
+  return false
+}
+
+// Send a request to the backend to kill this executor(s)
+val executorsRemoved = if (testing) {
+  executorIdsToBeRemoved
+} else {
+  client.killExecutors(executorIdsToBeRemoved)
+}
+
+if (testing || executorsRemoved.nonEmpty) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
+  for(index <- 0 until executorsRemoved.size) {
--- End diff --

I would like to keep the numExistingExecutors, its easier to skim through 
logs for validating sequential release of executors with dynamic allocation 
enabled. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79196128
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 ---
@@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
 listenerBus.start()
 receiverTracker = new ReceiverTracker(ssc)
 inputInfoTracker = new InputInfoTracker(ssc)
+
+val executorAllocClient: ExecutorAllocationClient = 
ssc.sparkContext.schedulerBackend match {
+  case b: ExecutorAllocationClient => 
b.asInstanceOf[ExecutorAllocationClient]
+  case _ => null
+}
+
 executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
-  ssc.sparkContext,
+  executorAllocClient,
--- End diff --

We are checking that in createIfEnabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79036171
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Return whether the request is acknowledged. Ideally we should be 
returning the list of
+   * executors which were removed as the requested executors and the one's 
actually removed can be
+   * different (CoarseGrainedSchedulerBackend can filter some executors). 
To avoid breaking the API
+   * we continue to return a Boolean.
+   */
+  private def removeExecutors(executors: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+logInfo("Request to remove executorIds: " + executors.mkString(", "))
+val numExistingExecutors = executorIds.size - 
executorsPendingToRemove.size
+for(executorId <- executors) {
+  // Do not kill the executor if we have already reached the lower 
bound
+  val newExecutorTotal = numExistingExecutors - 
executorIdsToBeRemoved.size
+  if (newExecutorTotal - 1 < minNumExecutors) {
+logDebug(s"Not removing idle executor $executorId because there 
are only " +
+  s"$numExistingExecutors executor(s) left (limit 
$minNumExecutors)")
+  } else if (canBeKilled(executorId)) {
+executorIdsToBeRemoved += executorId
+  }
+}
+
+if (executorIdsToBeRemoved.isEmpty) {
+  return false
+}
+
+// Send a request to the backend to kill this executor(s)
+val executorsRemoved = if (testing) {
+  executorIdsToBeRemoved
+} else {
+  client.killExecutors(executorIdsToBeRemoved)
+}
+
+if (testing || executorsRemoved.nonEmpty) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
+  for(index <- 0 until executorsRemoved.size) {
+val removedExecutorId = executorsRemoved(index)
+val newExecutorTotal = numExistingExecutors - (index + 1)
+logInfo(s"Removing executor $removedExecutorId because it has been 
idle for " +
+  s"$executorIdleTimeoutS seconds (new desired total will be 
$newExecutorTotal)")
+executorsPendingToRemove.add(removedExecutorId)
+  }
+  true
+} else {
+  logWarning(s"Unable to reach the cluster manager to kill executor/s 
" +
+executorIdsToBeRemoved.mkString(",") + "or no executor eligible to 
kill!")
--- End diff --

You can use interpolation here to; at the very least you need a space 
before "or", otherwise the message will read funny.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79038316
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -583,7 +585,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   _ => Future.successful(false)
 }
 
-  adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+  val killResponse = 
adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
+
+  def execKilled(killStatus: Boolean): Future[Seq[String]] = 
Future.successful(
+if (killStatus) {
+  executorsToKill
+} else {
+  Seq.empty[String]
+}
+  )
+
+  killResponse.flatMap(flag => 
execKilled(flag))(ThreadUtils.sameThread)
--- End diff --

It would be clearer is `execKilled` was inlined here.

```
killResponse.flatMap { success => if (success) list else Nil 
}(ThreadUtils.sameThread)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79037842
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 .filter { id => force || !scheduler.isExecutorBusy(id) }
   executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
 
+  logInfo(s"Requesting to kill filtered executor(s) 
${executorsToKill.mkString(", ")}")
--- End diff --

"filtered" sounds weird... just drop it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79035363
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Return whether the request is acknowledged. Ideally we should be 
returning the list of
+   * executors which were removed as the requested executors and the one's 
actually removed can be
+   * different (CoarseGrainedSchedulerBackend can filter some executors). 
To avoid breaking the API
+   * we continue to return a Boolean.
+   */
+  private def removeExecutors(executors: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+logInfo("Request to remove executorIds: " + executors.mkString(", "))
+val numExistingExecutors = executorIds.size - 
executorsPendingToRemove.size
+for(executorId <- executors) {
--- End diff --

Please add spaces after all these keywords. You do this in many places in 
the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79035412
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Return whether the request is acknowledged. Ideally we should be 
returning the list of
+   * executors which were removed as the requested executors and the one's 
actually removed can be
+   * different (CoarseGrainedSchedulerBackend can filter some executors). 
To avoid breaking the API
+   * we continue to return a Boolean.
+   */
+  private def removeExecutors(executors: Seq[String]): Boolean = 
synchronized {
+
--- End diff --

nit: no need for this empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79037417
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1575,13 +1581,14 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = 
{
-schedulerBackend match {
+val killResponse = schedulerBackend match {
   case b: CoarseGrainedSchedulerBackend =>
 b.killExecutors(Seq(executorId), replace = true, force = true)
   case _ =>
 logWarning("Killing executors is only supported in coarse-grained 
mode")
-false
+Seq.empty[String]
 }
+killResponse.nonEmpty
--- End diff --

You could move the `nonEmpty` to the `killExecutors` call above and not 
have to change anything else in this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79036915
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -409,26 +474,7 @@ private[spark] class ExecutorAllocationManager(
   return false
--- End diff --

Drop `return`, use `if () ... else ...` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79038521
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 ---
@@ -83,8 +84,14 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
 listenerBus.start()
 receiverTracker = new ReceiverTracker(ssc)
 inputInfoTracker = new InputInfoTracker(ssc)
+
+val executorAllocClient: ExecutorAllocationClient = 
ssc.sparkContext.schedulerBackend match {
+  case b: ExecutorAllocationClient => 
b.asInstanceOf[ExecutorAllocationClient]
+  case _ => null
+}
+
 executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
-  ssc.sparkContext,
+  executorAllocClient,
--- End diff --

It feels like there should be an assertion somewhere that the client is not 
`null`. Otherwise `ExecutorAllocationManager` can't work, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79036795
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
+   * Return whether the request is acknowledged. Ideally we should be 
returning the list of
+   * executors which were removed as the requested executors and the one's 
actually removed can be
+   * different (CoarseGrainedSchedulerBackend can filter some executors). 
To avoid breaking the API
+   * we continue to return a Boolean.
+   */
+  private def removeExecutors(executors: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = new ArrayBuffer[String]
+
+logInfo("Request to remove executorIds: " + executors.mkString(", "))
+val numExistingExecutors = executorIds.size - 
executorsPendingToRemove.size
+for(executorId <- executors) {
+  // Do not kill the executor if we have already reached the lower 
bound
+  val newExecutorTotal = numExistingExecutors - 
executorIdsToBeRemoved.size
+  if (newExecutorTotal - 1 < minNumExecutors) {
+logDebug(s"Not removing idle executor $executorId because there 
are only " +
+  s"$numExistingExecutors executor(s) left (limit 
$minNumExecutors)")
+  } else if (canBeKilled(executorId)) {
+executorIdsToBeRemoved += executorId
+  }
+}
+
+if (executorIdsToBeRemoved.isEmpty) {
+  return false
+}
+
+// Send a request to the backend to kill this executor(s)
+val executorsRemoved = if (testing) {
+  executorIdsToBeRemoved
+} else {
+  client.killExecutors(executorIdsToBeRemoved)
+}
+
+if (testing || executorsRemoved.nonEmpty) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
+  for(index <- 0 until executorsRemoved.size) {
--- End diff --

I kinda dislike `for` in Scala... this looks cleaner to me:

```
var numExistingExecutors = ...
executorsRemoved.foreach { id =>
  numExistingExecutors -= 1
  logInfo(...)  
  executorsPendingToRemove.add(id)
}
```

You could also avoid the `numExistingExecutors` in every log message and 
just print a separate log message with the final count.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r79035297
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,70 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Request the cluster manager to remove the given executors.
--- End diff --

This comment is weird. It's a private method, so there's no API to maintain.

The actual API that you're trying to maintain is `removeExecutor` below. 
You could implement it by returning the list here and returning whether it's 
empty from that method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r78772541
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -2535,7 +2539,7 @@ object SparkContext extends Logging {
 
   private def getClusterManager(url: String): 
Option[ExternalClusterManager] = {
 val loader = Utils.getContextOrSparkClassLoader
-val serviceLoaders =
+var serviceLoaders =
--- End diff --

change back to val


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r78772038
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker
 import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, 
BytesWritable, DoubleWritable,
-  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
-import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat,
-  TextInputFormat}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, 
BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, 
NullWritable, Text, Writable}
--- End diff --

wrap line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r78771555
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS}
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
--- End diff --

remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r78106170
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

> Killing any one of them should return true

That's the current behavior, so it should be preserved. Yeah it's sketchy 
but there really isn't a good solution, so go with the current state of things.

> Can we change the api to return the list of killed executors

It's marked as `@DeveloperApi`, but even then, it's frowned upon to break 
compatibility for these APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-08 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r78092320
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

@vanzin 
We can do the following: 
* Get ```SparkContext``` rid of the ```ExecutorAllocationClient```
* ```SparkContext``` still exposes the killExecutor(s) 
* Change ```ExecutorAllocationClient``` to return a list of filtered 
executors which were requested to be killed.

The current dev api for ```killExecutors``` returns a boolean. How do we 
want to interpret this. Because the ```CoarseGrainedSchedulerBackend``` can 
decide to kill a subset of the requested executors, so how do we interpret 
this? Killing any one of them should return true or killing all of them? The 
semantics are kind of hazy here. 

Can we change the api to return the list of killed executors making it 
consistent throughout? Or keep the api the same but have not very clear 
understanding.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77724521
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

Was missing the context of SPARK-9552.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77723977
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

ah ok, I see it now, yeah we should fix the api then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77720376
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

The problem is not the backend. The problem is 
`CoarseGrainedSchedulerBackend.killExecutors` which can decide to kill just a 
subset of the executors asked to be killed.

I'm wary of @dhruve's assessment of the problem because that code was added 
*exactly* because the code was killing busy executors that had for some reason 
become "idle".  See SPARK-9552.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77719621
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

so I took a quick look at this and I agree with you that ideally its 
returning ones actually removed, but right now I don't see any of the backends 
actually returning anything other then true unless its something catastrophic 
where it wouldn't matter if the executor list is 1 or many.
 Yarn mode right now always returns true (ApplicationMaster KillExecutors 
endpoint). Standalone mode only return false if the app doesn't exist so if we 
pass 1 or many executors doesn't matter, mesos mode only returns false if the 
mesosdriver is null.

So changing the interface to do the right things seems like it can be a 
separate issue then this one.  or perhaps I missed something?


But it looks like there are a few cases that mesos and standalone mode do 
return false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77684425
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

Sorry, I'm not comfortable with your assumption. Without changing the 
interface to properly return the executors that were actually removed, this 
change is too risky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77684087
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

Redefining the ```ExecutorAllocationClient``` interface would definitely be 
a more meaningful change where we return the executors which were actually 
removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77683485
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

As @vanzin mentioned, the ```CoarseGrainedSchedulerBackend``` filters out 
executors based on whether we want to force kill the executors which are 
already running and the one's which are already pending to be removed. 

In our case, we are requesting to kill the executors which have remained 
idle for the configured timeout duration. AFAIK this does not lead to filtering 
out any executors unless they are already marked for removal which is harmless. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77680444
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
+if (removeRequestAcknowledged) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
--- End diff --

This was solely to maintain consistency with earlier log message, which I 
think is useful and more readable rather than scanning the logs separately to 
check for the removed executor log and the new desired total.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77426133
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

Ah, this is something I've already asked for in the past... `killExecutors` 
should really return something more interesting than a boolean. Because 
`CoarseGrainedSchedulerBackend` *will* ignore executors it doesn't want to kill.

The problem with that is that through some unfortunate inheritance chain, 
`SparkContext` actually extends `ExecutorAllocationClient` and thus, 
`killExecutors` is a public API.

I think we should fix this so that:
- `SparkContext` still exposes the existing `killExecutors`, but doesn't 
override `ExecutorAllocationClient`
- `ExecutorAllocationClient` defines the proper interface, which optimally 
would return the list of executors actually removed so that the rest of the 
code here can do the right thing.

Otherwise, there's no way to do what this patch proposes without breaking 
all the accounting, unfortunately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77309831
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
--- End diff --

Is there a risk that somehow _some_ of the executors are killed and not 
others? The code kinda assumes now that all or none succeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77309507
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
+if (removeRequestAcknowledged) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
+  for(index <- 0 until executorIdsToBeRemoved.size) {
+logInfo(s"Removing executor " + executorIdsToBeRemoved(index) + " 
because it has been " +
--- End diff --

This mixes string interpolation and concatenation. You can rewrite it with 
interpolation, possibly defining extra vals for clarity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77309440
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -392,10 +397,36 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
+   * Request the cluster manager to remove the given executors.
* Return whether the request is received.
*/
-  private def removeExecutor(executorId: String): Boolean = synchronized {
+  private def removeExecutors(executorIds: Seq[String]): Boolean = 
synchronized {
+
+val executorIdsToBeRemoved = executorIds.filter(canBeKilled)
+
+// Send a request to the backend to kill this executor
+val removeRequestAcknowledged = testing || 
client.killExecutors(executorIdsToBeRemoved)
+if (removeRequestAcknowledged) {
+  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+  var index = 0
--- End diff --

why this? the `for` already defines it for you


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77309343
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
 
 updateAndSyncNumExecutorsTarget(now)
 
+val executorIdsToBeRemoved = new ArrayBuffer[String]
 removeTimes.retain { case (executorId, expireTime) =>
   val expired = now >= expireTime
   if (expired) {
 initializing = false
-removeExecutor(executorId)
+executorIdsToBeRemoved += executorId
   }
   !expired
 }
+if(executorIdsToBeRemoved.size != 0) {
--- End diff --

`if (executorIdsToBeRemoved.nonEmpty) {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14926#discussion_r77309320
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
 
 updateAndSyncNumExecutorsTarget(now)
 
+val executorIdsToBeRemoved = new ArrayBuffer[String]
--- End diff --

Probably trivial, but I think just `ArrayBuffer[String]()` is more idiomatic


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14926: [SPARK-17365][Core] Remove/Kill multiple executor...

2016-09-01 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/14926

[SPARK-17365][Core] Remove/Kill multiple executors together to reduce…

## What changes were proposed in this pull request?
We are killing multiple executors together instead of iterating over 
expensive RPC calls to kill single executor.

## How was this patch tested?
Executed sample spark job to observe executors being killed/removed with 
dynamic allocation enabled.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-17365

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14926


commit f058ad299fd9a4430da296f637aa539979d6368c
Author: Dhruve Ashar 
Date:   2016-09-01T20:19:08Z

[SPARK-17365][Core] Remove/Kill multiple executors together to reduce 
contention




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org