spark git commit: [SPARK-23365][CORE] Do not adjust num executors when killing idle executors.

2018-02-27 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 6eee545f9 -> 30242b664


[SPARK-23365][CORE] Do not adjust num executors when killing idle executors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name `replace` was misleading with DynamicAllocation on, as the target 
number
of executors is changed outside of the call to `killExecutors`, so I adjusted 
that name.  Also separated out the logic of `countFailures` as you don't always 
want that tied to `replace`.

While I was there I made two changes that weren't directly related to this:
1) Fixed `countFailures` in a couple cases where it was getting an incorrect 
value since it used to be tied to `replace`, eg. when killing executors on a 
blacklisted node.
2) hard error if you call `sc.killExecutors` with dynamic allocation on, since 
that's another way the ExecutorAllocationManager and the 
CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to 
ExecutorAllocationClient do not adjust the number of executors.

Author: Imran Rashid 

Closes #20604 from squito/SPARK-23365.

(cherry picked from commit ecb8b383af1cf1b67f3111c148229e00c9c17c40)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30242b66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30242b66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30242b66

Branch: refs/heads/branch-2.3
Commit: 30242b664a05120c01b54ba9ac0ebed114f0d54e
Parents: 6eee545
Author: Imran Rashid 
Authored: Tue Feb 27 11:12:32 2018 -0800
Committer: Marcelo Vanzin 
Committed: Tue Feb 27 11:12:54 2018 -0800

--
 .../apache/spark/ExecutorAllocationClient.scala | 15 ++---
 .../spark/ExecutorAllocationManager.scala   | 20 --
 .../scala/org/apache/spark/SparkContext.scala   | 13 +++-
 .../spark/scheduler/BlacklistTracker.scala  |  3 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 22 ---
 .../spark/ExecutorAllocationManagerSuite.scala  | 66 +++-
 .../StandaloneDynamicAllocationSuite.scala  |  3 +-
 .../spark/scheduler/BlacklistTrackerSuite.scala | 14 ++---
 8 files changed, 121 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/30242b66/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 9112d93..63d87b4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is considered 
a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards the 
limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
+* to count those failures toward task failure limits
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to 
be removed.
*/
   def killExecutors(
 executorIds: Seq[String],
-replace: Boolean = false,
+adjustTargetNumExecutors: Boolean,
+countFailures: Boolean,
 force: Boolean = false): Seq[String]
 
   /**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
* @return whether the request is acknowledged by the cluster manager.
*/
   def killExecutor(executorId: String): Boolean = {
-val killedExecutors = killExecutors(Seq(executorId))
+val killedExecutors = killExecutors(Seq(executorId), 
adjustTargetNumExecutors = true,
+  countFailures = false)
 killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
   }
 }


spark git commit: [SPARK-23365][CORE] Do not adjust num executors when killing idle executors.

2018-02-27 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 414ee867b -> ecb8b383a


[SPARK-23365][CORE] Do not adjust num executors when killing idle executors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name `replace` was misleading with DynamicAllocation on, as the target 
number
of executors is changed outside of the call to `killExecutors`, so I adjusted 
that name.  Also separated out the logic of `countFailures` as you don't always 
want that tied to `replace`.

While I was there I made two changes that weren't directly related to this:
1) Fixed `countFailures` in a couple cases where it was getting an incorrect 
value since it used to be tied to `replace`, eg. when killing executors on a 
blacklisted node.
2) hard error if you call `sc.killExecutors` with dynamic allocation on, since 
that's another way the ExecutorAllocationManager and the 
CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to 
ExecutorAllocationClient do not adjust the number of executors.

Author: Imran Rashid 

Closes #20604 from squito/SPARK-23365.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecb8b383
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecb8b383
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecb8b383

Branch: refs/heads/master
Commit: ecb8b383af1cf1b67f3111c148229e00c9c17c40
Parents: 414ee86
Author: Imran Rashid 
Authored: Tue Feb 27 11:12:32 2018 -0800
Committer: Marcelo Vanzin 
Committed: Tue Feb 27 11:12:32 2018 -0800

--
 .../apache/spark/ExecutorAllocationClient.scala | 15 ++---
 .../spark/ExecutorAllocationManager.scala   | 20 --
 .../scala/org/apache/spark/SparkContext.scala   | 13 +++-
 .../spark/scheduler/BlacklistTracker.scala  |  3 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 22 ---
 .../spark/ExecutorAllocationManagerSuite.scala  | 66 +++-
 .../StandaloneDynamicAllocationSuite.scala  |  3 +-
 .../spark/scheduler/BlacklistTrackerSuite.scala | 14 ++---
 8 files changed, 121 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ecb8b383/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 9112d93..63d87b4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is considered 
a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards the 
limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
+* to count those failures toward task failure limits
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to 
be removed.
*/
   def killExecutors(
 executorIds: Seq[String],
-replace: Boolean = false,
+adjustTargetNumExecutors: Boolean,
+countFailures: Boolean,
 force: Boolean = false): Seq[String]
 
   /**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
* @return whether the request is acknowledged by the cluster manager.
*/
   def killExecutor(executorId: String): Boolean = {
-val killedExecutors = killExecutors(Seq(executorId))
+val killedExecutors = killExecutors(Seq(executorId), 
adjustTargetNumExecutors = true,
+  countFailures = false)
 killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecb8b383/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git