[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-05-18 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-220069649
  
Hi everyone, sorry this has sat around for so long, but this is a super 
important feature, so I'd like to finish up the missing pieces and get this in 
(with credit to @mwws still, of course).  Other than just fixing merge 
conflicts, it seems there are two things which need to be addressed: (1) 
addressing performance concerns with some profiling and (2) investigating a 
lock-free implementation (think this can be done but need to look more 
carefully).  I'll also build on the tests introduced here 
https://github.com/apache/spark/pull/8559.

I'll open a WIP pr in the next day or two at least, but please let me know 
if I'm missing any major concern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-187586804
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51737/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-187586803
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-187586614
  
**[Test build #51737 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51737/consoleFull)**
 for PR 8760 at commit 
[`ecad5ff`](https://github.com/apache/spark/commit/ecad5ffe2bcc02f9b181a209172f22294e6af4ef).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-22 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-187548596
  
Besides from what @squito stated above, I have simplified the configuration 
to resolve @kayousterhout 's concern. By default, exact same blacklist logic 
will be applied as original. User can enable new blacklist logic by set 
`spark.scheduler.blacklist.advancedStrategy=true` as an experimental feature. 
Comparing with original logic, "Advanced Strategy" enable node blacklist: If 
more than 1 executors on same node are in blacklist for given stage, we put all 
executors on that node in blacklist for this stage. More details you can refer 
to comments and [unit 
test](https://github.com/mwws/spark/blob/SPARK-8426/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala).
 

And the code has been rebased to latest master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-187547524
  
**[Test build #51737 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51737/consoleFull)**
 for PR 8760 at commit 
[`ecad5ff`](https://github.com/apache/spark/commit/ecad5ffe2bcc02f9b181a209172f22294e6af4ef).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-18 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-185965488
  
@kayousterhout @markhamstra one other thing to follow up on was the earlier 
idea of using speculation to test out whether blacklisted executors can be 
added back.  While I think its a neat _idea_, I really do not think it needs to 
be included here.  First of all, speculation has always been somewhat poorly 
tested and its taken a while to root out some race conditions -- I dont' think 
users would want speculation turned on without explicitly requesting it, and I 
think we can still do useful blacklisting without requiring users to turn it 
on.  Second, I think it could lead to lots of user confusion from the errors 
generated by these speculative tasks.  Third, it would add a lot of complexity, 
and it can always be added as an option later on.

I think the other concern is how this will affect scheduler throughput.  I 
ran the spark-perf scheduler throughput tests (just locally), and didn't see 
any significant performance difference.  However those tests aren't really 
stressing this part of the code, in particular there aren't failed executors, 
so its probably worth thinking of another test for this.  The per-task overhead 
introduced with the lock-less approach will just be one call to 
`clock.getTimeMillis()`, which is pretty negligible, plus an occasional "blip" 
in the scheduling when the blacklists are updated.  That would be non-zero but 
I also doubt it would be a problem, however some profiling is in order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-18 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-185602366
  
@chenghao-intel Sure, I will update soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-17 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-185212009
  
@mwws can you update the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-17 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r53165346
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Define a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given stage and 
partition */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: StageAndPartition,
+  clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist for specified stage. By default it 
returns the same result as
+   *  getNodeBlacklist. It could be override in strategy implementation.
+   */
+  def getNodeBlacklistForStage(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  stageId: Int,
+  clock: Clock): Set[String] = 
getNodeBlacklist(executorIdToFailureStatus, clock)
+
+  /**
+   * Choose which executors should be removed from blacklist. Return true 
if any executors are
+   * removed from the blacklist, false otherwise. The default 
implementation removes executors from
+   * the blacklist after [[expireTimeInMilliseconds]]
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy adds an executor to the blacklist for all tasks when the 
executor has too many
+ * task failures. An executor is placed in the blacklist when there are 
more than
+ * [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node 
are put into the
+ * blacklist if there are more than [[maxBlacklistedExecutors]] 
blacklisted executors on one node.
+ * The benefit of this strategy is that different taskSets can learn 
experience from other taskSet
+ * to avoid allocating tasks on problematic executors.
+ */
+private[scheduler] class ExecutorAndNodeStrategy(
+maxFailedTasks: Int,
+maxBlacklistedExecutors: Int,
+val expireTimeInMilliseconds: Long
+  ) extends BlacklistStrategy {
+
+  private def getExecutorBlacklistInfo(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = 
{
+executorIdToFailureStatus.filter{
+  case (id, failureStatus) => failureStatus.totalNumFailures > 
maxFailedTasks
+}
+  }
+
+  // As this is a task unrelated strategy, the input StageAndPartition 
info will be ignored
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: StageAndPartition,
+  clock: Clock): Set[String] = {
+getExecutorBlacklistInfo(executorIdToFailureStatus).keys.toSet
+  }
+
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  clock: Clock): Set[String] = {
+

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r52685053
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidateCache()
+}
+  }
--- End diff --

this is a good point.  effort was made to avoid doing too much work with 
this lock, by caching the set of blacklisted nodes and executors.  But maybe we 
can do a bit better.

The only reason we need to synchronize is b/c of the background thread that 
expires executors from the blacklist -- this is just called from a 
`TaskSetManger`, which in turn can only get called from threads with a lock on 
the `TaskScheduler`.  So if instead of updating the cache in the background 
thread, we just have each of the methods check themselves if the blacklist 
needs to be updated, I think we could completely eliminate the need for the 
lock.

You'd still occasionally be pausing scheduling to run 
updateFailedExecutors, but even with 1000s of executors this seems pretty 
minor, and it is not running very often (60s by default).  We could avoid the 
overhead of synchronization for scheduling every task, however.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r52682469
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -21,6 +21,7 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
+import scala.collection.immutable.Set
--- End diff --

this is very subjective, but I find it really confusing to have fully 
imported both immutable and mutable collections in the same file.  (a) If you 
really want the immutable one, I'd just fully qualify it where you need to 
`collection.immutable.Set`.  Or maybe (b) you could just use a mutable Set here 
for consistency?  Or (c) if the code here doesn't care whether its mutable or 
immutable, you could take a `collection.Set`, and just fully qualify it where 
you use it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r52681082
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidateCache()
+}
+  }
+
+  def updateFailedExecutors(
+  stageId: Int,
+  partation: Int,
+  info: TaskInfo,
+  reason: TaskEndReason) : Unit = synchronized {
+
+val atomTask = StageAndPartition(stageId, partation)
+reason match {
+  // If the task succeeded, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailedExecutorsForTaskId(info.executorId, stageId, partation)
+
+  // If the task failed, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask(atomTask) = failedTimes
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus(executorId) = failureStatus
+}
+invalidateCache()
+}
+  }
+
+  /** remove the executorId from executorIdToFailureStatus */
+  def 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r52681156
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -270,19 +260,11 @@ private[spark] class TaskSetManager(
 taskAttempts(taskIndex).exists(_.host == host)
   }
 
-  /**
-   * Is this re-execution of a failed task on an executor it already 
failed in before
-   * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
-   */
-  private def executorIsBlacklisted(execId: String, taskId: Int): Boolean 
= {
-if (failedExecutors.contains(taskId)) {
-  val failed = failedExecutors.get(taskId).get
-
-  return failed.contains(execId) &&
-clock.getTimeMillis() - failed.get(execId).get < 
EXECUTOR_TASK_BLACKLIST_TIMEOUT
-}
+  var blacklistTracker = sched.sc.blacklistTracker
--- End diff --

can be `private` I think


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-02-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r52681359
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with 
MockitoSugar {
+
+  val FAILURE: TaskEndReason = new ExceptionFailure(
+  "Fake",
+  "fake failure",
+  Array.empty[StackTraceElement],
+  "fake stack trace",
+  None,
+  None)
+
+  val stage1 = 1
+  val stage2 = 2
+
+  val partation1 = 1
+  val partation2 = 2
+  val partation3 = 3
--- End diff --

typo: partition

here and lots of other places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-26 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-175342880
  
@mwws can you rebase the code?
@kayousterhout any more comments on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-171210808
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49294/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-13 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-171210609
  
**[Test build #49294 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49294/consoleFull)**
 for PR 8760 at commit 
[`25a7c6f`](https://github.com/apache/spark/commit/25a7c6fc329687310f1ae88e56bcd72a5c259b61).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-171210806
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-171180163
  
**[Test build #49294 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49294/consoleFull)**
 for PR 8760 at commit 
[`25a7c6f`](https://github.com/apache/spark/commit/25a7c6fc329687310f1ae88e56bcd72a5c259b61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-10 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-170442019
  
@squito @kayousterhout @mridulm do you have any further comments on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-10 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-170444236
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-170445768
  
**[Test build #49099 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49099/consoleFull)**
 for PR 8760 at commit 
[`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-10 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-170446175
  
**[Test build #49099 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49099/consoleFull)**
 for PR 8760 at commit 
[`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-170446186
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2016-01-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-170446190
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49099/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-167728825
  
**[Test build #48401 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48401/consoleFull)**
 for PR 8760 at commit 
[`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-28 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-167728914
  
@mridulm I have changed the interface a little bit and create 
`AdvancedSingleTaskStrategy` to support the use case you described above. With 
this new strategy, we enable node blacklist and scope of influence is limited 
in every stage. A unit test *AdvancedSingleTask strategy works* is added to 
show how it works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-28 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-167741271
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-28 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-167741274
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48401/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-167741137
  
**[Test build #48401 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48401/consoleFull)**
 for PR 8760 at commit 
[`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-23 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-167052293
  
@squito @mridulm I am afraid an additional state can not help here, your 
use case requires "TaskSet related nodeBlacklist" which is not supported with 
current implementation. I am making change to support it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-16 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-165280794
  
@mridulm going back to one of your previous comments:

> What would be more ideal is an implementation which allows for 
blacklisting single task (as is current), to multiple tasks for a taskset on a 
single executor (in case the taskset has issues on the executor - memory 
constraints perhaps) to blacklisting the node : time bound ofcourse, and for 
specific taskset and/or all taskset until timeout expires (directory cleanup 
catching up, etc).
Did I miss this in there somewhere ?

this isn't specifically included, but I think it could be added as an 
alternate strategy without a huge change.  If I understand correctly, you're 
basically proposing adding another blacklisted state, where an executor is 
completely blacklisted, but only for *one taskset*, and after its blacklisted 
for multiple tasksets then its promoted to being blacklisted for all tasksets 
(still subject to some timeout or something)?  I don't think that would be too 
hard but also trying to find the balance between adding too many knobs -- is 
there a clear use case for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-16 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-165279831
  
@markhamstra I'm not entirely sure I understand the idea of de-blacklisting 
executors via speculation.  The two seem rather independent, and I'm actually 
pretty interested in this feature even w/ speculation turned off.  I don't 
really see how you'd leverage speculative tasks for this, it seems like it 
would complicate things a lot by tying them together, but maybe I just need the 
details spelled out more clearly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-16 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-165304077
  

@squito Unless an executor or node is dying/has resource issues, in most 
cases, the failure is not repeatable across tasksets due to differing 
execution/resource characterstics. So blanket blacklist can be extremely 
suboptimal.
For example: taskset requiring disk usage (shuffle) vs taskset for 
executing collect() - the latter is not impacted by disk space being full/being 
reclaimed in background while former will keep failing.

An additional state might most probably handle the requirement - will be 
able to comment more when I actually see the design ofcourse :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-03 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-161729752
  
I still like the idea of using speculative tasks to parole blacklisted 
executors and hosts, but that new role for speculative execution will 
complicate https://github.com/apache/spark/pull/8683. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-161100737
  
@kayousterhout this is pretty important for users running clusters with a 
larger clusters, eg. a few hundred nodes.  We've seen cases where there is some 
weird misconfiguration on a node, which leads to all tasks failing on that 
node.  The job will eventually succeed if you use the existing blacklist 
behavior, but with even one bad node you get so many task failure messages its 
very hard to make any sense of what is going on -- apps with thousands of tasks 
per stage and tens of stages generate tons of task failures.  This leads to 
users complaining that things are broken, when its actually things are "ok" in 
the sense that they run successfully after completing, and also users being 
unable to find the "real" failures hidden in all the bogus msgs.

Cluster managers don't help here -- the problem is when there is a gap 
between the manager's notion of a "healthy" node and the particular spark 
application (eg., one particular library fails to load, or bad memory or 
something).

There is also an efficiency advantage to blacklisting the entire executor, 
rather than just letting tasks fail on your bad executors.  In general the 
tasks will fail quickly and just get rescheduled (I've only actually seen one 
case where the tasks did not fail almost immediately), so maybe this is more 
minor, but seems like a nice thing to do right.  But beyond just the initial 
task scheduling, this PR adds the ability for the cluster manager to avoid 
giving more executors on the bad node.  Plus this opens the ability for spark 
to actively request a new executor -- eg., if you request 5 executors and one 
is bad, rather than running with 80% of your requested resources, you should 
just throw away that executor and get another one (if the cluster has 
capacity).  though that is not directly included in this patch -- in earlier 
discussion @mwws thought it best to push off to follow up work.

I also like the idea of minimizing user configuration, but also recognize 
that we are working from a position of some ignorance on the failure modes 
people encounter and how they want fixes.  Eg., I imagine that everyone would 
want to use the `ExecutorAndNode` strategy and not change any other 
configuration, but perhaps for some users they want the old behavior and more 
knobs.  That's also why I'm reluctant to change the default behavior (but not 
drawing a hard line either).  After providing some options we could try to 
collect some more feedback on what works.

You've got a good point about only blacklisting executors across jobs if 
the tasks succeed elsewhere.  I was initially only thinking about a linear 
spark application -- in that case, if some tasks in a job fail everywhere, then 
the job will fail and the spark app is done in any case.  But of course apps 
may try to recover from failed jobs, or even have completely independent jobs, 
in a "job server" style deployment.

Another point you made on another pr is that we have to ensure that we 
don't prevent any progress from being made if all executors get blacklisted and 
there is nowhere for any tasks to run.  We should make sure we have some 
sensible behavior there, eg., fail the job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-161132665
  
I had one other idea here: for blacklisted executors / hosts, we *could* 
periodically run a speculative copy of a task there.  If the task succeeds, we 
can un-blacklist the executor, and if the task fails, we haven't lost anything, 
because the task is just a speculative copy, so there's another copy of the 
task running elsewhere.  That would allow us to more aggressively re-try the 
executor, without sacrificing performance of any running jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-161133600
  
@kayousterhout Yeah, that's kind of interesting -- kind of like "arbeit 
macht frei", except it's not a lie.  Probably shouldn't have gone there


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-161165662
  
@kayousterhout I believe in most of case user just can just use default 
configuration or set `spark.scheduler.blacklist.strategy` to `executorAndNode`. 
I also like idea about minimizing user configuration, but at the same time, it 
would better to provide extendability and flexibility in case the "smart" 
system can not satisfy all user requirements.

About "running a speculative copy of a task", it might be doable but as I 
understand it add much complex to maintain correct status of TaskSet/Stage 
which the "speculative task" belows to. What if all other tasks has finished 
and the "speculative task" is still running? Should the speculative task be 
retried as normal tasks? What if "speculative task" succeeded but original task 
failed? Should next stage be aware of these "speculative tasks"? etc. And from 
system monitoring point of view, additional duplicated tasks are created which 
may confuse user.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-161070513
  
Before fixing style issues on this change, I think it's worth considering 
whether this is the right approach for blacklisting.  Based on discussion here 
and in #3541 and #10045, it seems like there are a few high-level goals here:

(1) Minimize the required user configuration (ideally this should just work 
without any configuration)
(2) If an executor/host isn't working well, stop using it for the 
particular job
(3) If an executor/host is failing for multiple jobs, stop using it across 
all jobs
(4) Eventually re-try the executor/host, since the failure may be transient
(5) Don't overcorrect for bad tasks / jobs (that fail regardless of whether 
they're run)

My concern with the approach in this PR is that it requires a lot of user 
configuration, and as has been discussed in the various PRs, the appropriate 
blacklist timeout is very workload-dependent and type-of-failure-dependent.  I 
wonder if it would make sense to do something like an exponentially increasing 
timeout (where each consecutive failure triggers a longer timeout) to make this 
have a lower configuration overhead.  I pinged @mateiz to see if he has any 
other ideas about how to do this gracefully.

The other issue is (5). One way to handle that is to only "count" a task 
failure if the task fails on the executor *and* succeeds elsewhere.

It would be great if we could make blacklisting work well out-of-the-box, 
so I think it's worth putting some thought into the right approach here.  It 
would be useful to get others folks' feedback about whether these are the right 
goals and if there are better ideas for how to achieve them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r46249345
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidateCache()
+}
+  }
+
+  def updateFailedExecutors(
+  stageId: Int,
+  partation: Int,
+  info: TaskInfo,
+  reason: TaskEndReason) : Unit = synchronized {
+
+val atomTask = StageAndPartition(stageId, partation)
+reason match {
+  // If the task succeeded, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailedExecutorsForTaskId(info.executorId, stageId, partation)
+
+  // If the task failed, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask(atomTask) = failedTimes
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus(executorId) = failureStatus
+}
+invalidateCache()
+}
+  }
+
+  /** remove the executorId from executorIdToFailureStatus */
+  def 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-12-01 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160890985
  
Since I am no longer very familiar with scheduler codebase, I am sort of 
giving up on my review halfway through - save the notes already added. 
Hopefully someone else more active can give a +1 !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160728616
  
@davies thanks -- part of what I was wondering is whether the existing 
blacklist mechanism is good enough, since many of the potential issues 
mentioned seem like things that would cause tasks to quickly fail, so I'm 
wondering how big the performance penalty is of making each task set learn on 
its own that a particular executor is bad. Curious to hear others weigh in here 
about whether this is a problem they're experiencing (@squito have you seen 
this issue in production?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160707369
  
@davies is this the issue you were running into recently (Patrick mentioned 
you'd been experiencing a similar issue)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160709640
  
@kayousterhout I ran into a issue with bad host (a library can't be 
installed), it could be workarounded by specify the 
spark.scheduler.executorTaskBlacklistTime, unfortunately it's 0 by default 
(means disabled). I think we should increase the default value of  
spark.scheduler.executorTaskBlacklistTime in 1.6 at least.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160734431
  
I remembered why this discussion sounds so familiar: there was a long 
discussion about host-level blacklisting on this PR: 
https://github.com/apache/spark/pull/3541


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160814564
  
`spark.scheduler.executorTaskBlacklistTime` is renamed to 
`spark.scheduler.blacklist.timeout` in this PR to make blacklist related 
configuration following unified pattern spark.scheduler.blacklist.. By the 
way, the usage of this timeout configuration was missed in configuration doc 
before, I have added it back with new name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160864884
  
@kayousterhout It's an interesting discussion on PR: #3541. As I 
understand, this PR can satisfy both use case. 

For @davies , you might want to use `ExecutorAndNodeStrategy` by change 
`spark.scheduler.blacklist.strategy` configuration to enable host level 
blacklist. By default if 3 executors on the same host are blacklisted, we think 
the host should also be blacklisted. Also the threshold are configurable.

For @mridulm , I tried to keep the same behavior as before in this PR, the 
default "SingleTaskStrategy" is what you want. It maintain an executor level 
blacklist for every TaskSet, and  the timeout mechanism is retained. (Although 
`spark.scheduler.executorTaskBlacklistTime` is renamed to 
`spark.scheduler.blacklist.timeout`, but the old configuration name is also 
supported to keep backward compatibility. So nothing need to be changed from 
your side.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r46247212
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Define a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given stage and 
partition */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: StageAndPartition,
+  clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Choose which executors should be removed from blacklist. Return true 
if any executors are
+   * removed from the blacklist, false otherwise. The default 
implementation removes executors from
+   * the blacklist after [[expireTimeInMilliseconds]]
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy adds an executor to the blacklist for all tasks when the 
executor has too many
+ * task failures. An executor is placed in the blacklist when there are 
more than
+ * [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node 
are put into the
+ * blacklist if there are more than [[maxBlacklistedExecutors]] 
blacklisted executors on one node.
+ * The benefit of this strategy is that different taskSets can learn 
experience from other taskSet
+ * to avoid allocating tasks on problematic executors.
+ */
+private[scheduler] class ExecutorAndNodeStrategy(
+maxFailedTasks: Int,
+maxBlacklistedExecutors: Int,
+val expireTimeInMilliseconds: Long
+  ) extends BlacklistStrategy {
+
+  private def getSelectedExecutorMap(
--- End diff --

This needs to be renamed to something more appropriate - 
filterFailedExecutors or some such - so that it is clear what it does. Selected 
Executor does not really convey what it is supposed to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r46248240
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidateCache()
+}
+  }
--- End diff --

@kayousterhout Would be good to have your thoughts here since I am fairly 
out of touch with scheduler now.

If I read it correctly, multiple methods in this class are directly called 
from scheduler as part of identifying, starting, updating tasks to be run from 
TaskSetManager/scheduler and they all use the same lock as 
updateFailedExecutors - which can actually run fairly non trivial computation 
as number of failures increase in the system. Is this a concern or is it 
alleviated somehow and I missed it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r46248217
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidateCache()
+}
+  }
+
+  def updateFailedExecutors(
+  stageId: Int,
+  partation: Int,
--- End diff --

sp


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-30 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160884866
  
What would be more ideal is an implementation which allows for blacklisting 
single task (as is current), to multiple tasks for a taskset on a single 
executor (in case the taskset has issues on the executor - memory constraints 
perhaps) to blacklisting the node : time bound ofcourse, and for specific 
taskset and/or all taskset until timeout expires (directory cleanup catching 
up, etc).
Did I miss this in there somewhere ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160490927
  
**[Test build #46861 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46861/consoleFull)**
 for PR 8760 at commit 
[`19e8cc0`](https://github.com/apache/spark/commit/19e8cc0fde56ab2055e44f5501534764d6dfcfe1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160505192
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/46861/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160505191
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160505134
  
**[Test build #46861 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46861/consoleFull)**
 for PR 8760 at commit 
[`19e8cc0`](https://github.com/apache/spark/commit/19e8cc0fde56ab2055e44f5501534764d6dfcfe1).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-29 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160506398
  
This seems potentially useful, but adds a lot of complexity.  Can you 
elaborate on the use cases where this problem has been arising? My vague memory 
of previous discussions about adding similar functionality is that the various 
cluster managers have existing functionality to fail workers that are 
repeatedly failing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-29 Thread mwws
Github user mwws commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160544588
  
@kayousterhout For example, task can failed because some host can't access 
source/network, local storage of some host is full, broken class linkage and 
other run-time exception. In such case, I think the worker will keep alive and 
cluster managers can't help too much. So that farther tasks will still submit 
to the problematic executors/nodes. Original code base already contain similar 
blacklist mechanism `executorIsBlacklisted`, but blacklist is maintained in 
every TaskSetManager (called `failedExecutors`) and not shared with each other, 
which means if a new TaskSet comes, it cannot benefit from previous experience 
of other TaskSet.

This change adds complexity, but from user perspective, he can just use 
build-in `SingleNodeStrategy` which behave the same as original logic, or use 
build-in `ExecutorAndNodeStrategy` by only changing configuration. At the same 
time, it provides flexibility for advance user to customize it. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160076073
  
**[Test build #46811 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46811/consoleFull)**
 for PR 8760 at commit 
[`fc3d38e`](https://github.com/apache/spark/commit/fc3d38e04ef99006ee4c20d20d52d9e871c2dfb8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160107616
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160107618
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/46811/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-160107546
  
**[Test build #46811 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46811/consoleFull)**
 for PR 8760 at commit 
[`fc3d38e`](https://github.com/apache/spark/commit/fc3d38e04ef99006ee4c20d20d52d9e871c2dfb8).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-26 Thread mwws
Github user mwws commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r46021072
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -636,7 +622,8 @@ private[spark] class TaskSetManager(
   logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
 " because task " + index + " has already completed successfully")
 }
-failedExecutors.remove(index)
+
+blacklistTracker.foreach(_.updateFailureExecutors(stageId, info, 
Success))
--- End diff --

Thanks for point it out, I will change it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45793155
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskEndReason
+import org.apache.spark.Success
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.SparkContext
+import org.mockito.Mockito.{when, spy}
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with 
LocalSparkContext {
+
+  val FAILURE: TaskEndReason = new ExceptionFailure(
+  "Fake",
+  "fake failure",
+  Array.empty[StackTraceElement],
+  "fake stack trace",
+  None,
+  None)
+
+  val stage1 = 1
+  val stage2 = 2
+  // Variable name can indicate basic information of taskInfo
+  // The format is "taskInfo_executorId_taskIndex_hostName"
+  val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", 
TaskLocality.ANY, false)
+
+  val clock = new ManualClock(0)
+
+  test ("expireExecutorsInBlacklist works") {
+// expire time is set to 6s
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+  .set("spark.scheduler.executorTaskBlacklistTime", "6000")
+
+sc = new SparkContext(conf)
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+// Executor 1 into blacklist at Time 00:00:00
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+
+clock.setTime(2000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+// Executor 1 failed again at Time 00::00:02
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+
+clock.setTime(3000)
+// Executor 2 failed at Time 00:00:03
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(6000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(8000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2"))
+
+clock.setTime(1)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+  }
+
+  test("blacklist feature is off by default") {
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+sc = new SparkContext(conf)
+
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+
+

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45792679
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskEndReason
+import org.apache.spark.Success
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.SparkContext
+import org.mockito.Mockito.{when, spy}
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with 
LocalSparkContext {
+
+  val FAILURE: TaskEndReason = new ExceptionFailure(
+  "Fake",
+  "fake failure",
+  Array.empty[StackTraceElement],
+  "fake stack trace",
+  None,
+  None)
+
+  val stage1 = 1
+  val stage2 = 2
+  // Variable name can indicate basic information of taskInfo
+  // The format is "taskInfo_executorId_taskIndex_hostName"
+  val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", 
TaskLocality.ANY, false)
+
+  val clock = new ManualClock(0)
+
+  test ("expireExecutorsInBlacklist works") {
+// expire time is set to 6s
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+  .set("spark.scheduler.executorTaskBlacklistTime", "6000")
+
+sc = new SparkContext(conf)
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+// Executor 1 into blacklist at Time 00:00:00
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+
+clock.setTime(2000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+// Executor 1 failed again at Time 00::00:02
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+
+clock.setTime(3000)
+// Executor 2 failed at Time 00:00:03
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(6000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(8000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2"))
+
+clock.setTime(1)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+  }
+
+  test("blacklist feature is off by default") {
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+sc = new SparkContext(conf)
+
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+
+

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45790406
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45790885
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
--- End diff --

nit: rather than calling `update`, its more idiomatic to write this as 
`failureStatus.numFailuresPerTask(atomTask) = failedTimes` (scala syntax-sugar 
for the `update` method)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45790903
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
--- End diff --

same here on `update`


---
If your project is set up for it, you can reply to this email and 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/8760#issuecomment-159402366
  
sorry for the delay again, @mwws, thanks for updating.  My comments are now 
just cosmetic.  As its a scheduler change, pinging the usual folks: 
@markhamstra @kayousterhout @mateiz 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45791721
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy is simply based on given threshold and is taskId 
unrelated. An executor will be
+ * in blacklist, if it failed more than "maxFailureTaskNumber" times. A 
node will be in blacklist,
+ * if there are more than "maxBlackExecutorNumber" executors on it in 
executor blacklist.
+ *
+ * In this case, provided taskId will be ignored. The benefit for taskId 
unrelated strategy is that
+ * different taskSets can learn experience from other taskSet to avoid 
allocating tasks on
+ * problematic executors.
+ */
+private[scheduler] class SimpleStrategy(
+maxFailureTaskNumber: Int,
+maxBlackExecutorNumber: Int,
--- End diff --

can you change these params to `maxFailedTasks` and 
`maxBlacklistedExecutors`, and change the comment to:

This strategy adds an executor to the blacklist for *all* tasks when the 
executor has too many task failures.  An executor is placed in the blacklist 
when there are more than [[maxFailedTasks]] failed tasks.  Furthermore, all 
executors in one node are put into the blacklist if there are more than 
[[maxBlacklistedExecutors]] blacklisted executors on one node.  The benefit of 
this strategy is that different taskSets can learn experience from other 
taskSet to avoid allocating tasks on problematic executors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45791494
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy is simply based on given threshold and is taskId 
unrelated. An executor will be
+ * in blacklist, if it failed more than "maxFailureTaskNumber" times. A 
node will be in blacklist,
+ * if there are more than "maxBlackExecutorNumber" executors on it in 
executor blacklist.
+ *
+ * In this case, provided taskId will be ignored. The benefit for taskId 
unrelated strategy is that
+ * different taskSets can learn experience from other taskSet to avoid 
allocating tasks on
+ * problematic executors.
+ */
+private[scheduler] class SimpleStrategy(
+maxFailureTaskNumber: Int,
+maxBlackExecutorNumber: Int,
--- End diff --

can you change these params to `maxFailedTasks` and 
`maxBlacklistedExecutors`, and change the comment to:

This strategy adds an executor to the blacklist for *all* tasks when the 
executor has too many task failures.  An executor is placed in the blacklist 
when there are more than [[maxFailedTasks]] failed tasks.  Furthermore, all 
executors in one node are put into the blacklist if there are more than 
[[maxBlacklistedExecutors]] blacklisted executors on one node.  The benefit of 
this strategy is that different taskSets can learn experience from other 
taskSet to avoid allocating tasks on problematic executors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45790035
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45788633
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy is simply based on given threshold and is taskId 
unrelated. An executor will be
+ * in blacklist, if it failed more than "maxFailureTaskNumber" times. A 
node will be in blacklist,
+ * if there are more than "maxBlackExecutorNumber" executors on it in 
executor blacklist.
+ *
+ * In this case, provided taskId will be ignored. The benefit for taskId 
unrelated strategy is that
+ * different taskSets can learn experience from other taskSet to avoid 
allocating tasks on
+ * problematic executors.
+ */
+private[scheduler] class SimpleStrategy(
+maxFailureTaskNumber: Int,
+maxBlackExecutorNumber: Int,
+val expireTimeInMilliseconds: Long
+  )extends BlacklistStrategy {
+
+  private def getSelectedExecutorMap(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = 
{
+executorIdToFailureStatus.filter{
+  case (id, failureStatus) => failureStatus.totalNumFailures > 
maxFailureTaskNumber
+}
+  }
+
+  // As this is a taskId unrelated strategy, the input taskId will be 
ignored
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String] = {
+getSelectedExecutorMap(executorIdToFailureStatus).keys.toSet
+  }
+
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String] = {
+getSelectedExecutorMap(executorIdToFailureStatus)
+  .groupBy{case (id, failureStatus) => failureStatus.host}
+  .filter {case (host, executorIdToFailureStatus) =>
+executorIdToFailureStatus.size > maxBlackExecutorNumber}
+  .keys.toSet
+  }
+}
+
+/**
+ * This strategy is applied as default to keep the same semantics as 
original. It's an taskId
+ * related strategy. If an executor failed running "task A", then we think 
this executor is
+ * blacked for "task A". And we think the executor is still healthy for 
other 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45792961
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskEndReason
+import org.apache.spark.Success
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.SparkContext
+import org.mockito.Mockito.{when, spy}
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with 
LocalSparkContext {
+
+  val FAILURE: TaskEndReason = new ExceptionFailure(
+  "Fake",
+  "fake failure",
+  Array.empty[StackTraceElement],
+  "fake stack trace",
+  None,
+  None)
+
+  val stage1 = 1
+  val stage2 = 2
+  // Variable name can indicate basic information of taskInfo
+  // The format is "taskInfo_executorId_taskIndex_hostName"
+  val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", 
TaskLocality.ANY, false)
+
+  val clock = new ManualClock(0)
+
+  test ("expireExecutorsInBlacklist works") {
+// expire time is set to 6s
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+  .set("spark.scheduler.executorTaskBlacklistTime", "6000")
+
+sc = new SparkContext(conf)
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+// Executor 1 into blacklist at Time 00:00:00
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+
+clock.setTime(2000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+// Executor 1 failed again at Time 00::00:02
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+
+clock.setTime(3000)
+// Executor 2 failed at Time 00:00:03
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(6000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(8000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2"))
+
+clock.setTime(1)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+  }
+
+  test("blacklist feature is off by default") {
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+sc = new SparkContext(conf)
+
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+
+

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45769593
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
--- End diff --

If the task failed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45769579
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
--- End diff --

If the task succeeded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45755713
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756928
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756966
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
--- End diff --

actually remove the google import, not used anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45761465
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45761512
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
--- End diff --

`updateFailedExecutors`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756368
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -636,7 +622,8 @@ private[spark] class TaskSetManager(
   logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
 " because task " + index + " has already completed successfully")
 }
-failedExecutors.remove(index)
+
+blacklistTracker.foreach(_.updateFailureExecutors(stageId, info, 
Success))
--- End diff --

there is one really annoying detail here.  If there are multiple attempts 
for one stage (which happens on an executor failure), `info.index` is not 
referencing the same thing between different `TaskSet`s.  Attempt 2 might only 
need to run a subset of tasks, in which case index 7 could be partition 58.  I 
think what you want here is actually `tasks(index).partition`, which will refer 
to the same logic task across all attempts of a stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756677
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756736
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45759735
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskEndReason
+import org.apache.spark.Success
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.SparkContext
+import org.mockito.Mockito.{when, spy}
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with 
LocalSparkContext {
+
+  val FAILURE: TaskEndReason = new ExceptionFailure(
+  "Fake",
+  "fake failure",
+  Array.empty[StackTraceElement],
+  "fake stack trace",
+  None,
+  None)
+
+  val stage1 = 1
+  val stage2 = 2
+  // Variable name can indicate basic information of taskInfo
+  // The format is "taskInfo_executorId_taskIndex_hostName"
+  val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", 
TaskLocality.ANY, false)
+
+  val clock = new ManualClock(0)
+
+  test ("expireExecutorsInBlacklist works") {
+// expire time is set to 6s
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+  .set("spark.scheduler.executorTaskBlacklistTime", "6000")
+
+sc = new SparkContext(conf)
+val scheduler = new TaskSchedulerImpl(sc, 1)
--- End diff --

you can avoid creating a `SparkContext` in all of these tests with mockito. 
 add the `MockitoSugar` trait and then you can do:

```scala
val scheduler = mock[TaskSchedulerImpl]

when(scheduler.getExecutorsAliveOnHost(any[String])).thenReturn(Some(Set[String]()))
```
(or whatever else you you want that method to return).  Then you don't need 
to mixin `LocalSparkContext`.  it'll speed up the tests a bit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756882
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45757164
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskEndReason
+import org.apache.spark.Success
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.SparkContext
+import org.mockito.Mockito.{when, spy}
+import org.apache.spark.util.ManualClock
--- End diff --

nit: import grouping and ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45771875
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy is simply based on given threshold and is taskId 
unrelated. An executor will be
+ * in blacklist, if it failed more than "maxFailureTaskNumber" times. A 
node will be in blacklist,
+ * if there are more than "maxBlackExecutorNumber" executors on it in 
executor blacklist.
+ *
+ * In this case, provided taskId will be ignored. The benefit for taskId 
unrelated strategy is that
+ * different taskSets can learn experience from other taskSet to avoid 
allocating tasks on
+ * problematic executors.
+ */
+private[scheduler] class SimpleStrategy(
+maxFailureTaskNumber: Int,
+maxBlackExecutorNumber: Int,
+val expireTimeInMilliseconds: Long
+  )extends BlacklistStrategy {
+
+  private def getSelectedExecutorMap(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = 
{
+executorIdToFailureStatus.filter{
+  case (id, failureStatus) => failureStatus.totalNumFailures > 
maxFailureTaskNumber
+}
+  }
+
+  // As this is a taskId unrelated strategy, the input taskId will be 
ignored
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String] = {
+getSelectedExecutorMap(executorIdToFailureStatus).keys.toSet
+  }
+
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String] = {
+getSelectedExecutorMap(executorIdToFailureStatus)
+  .groupBy{case (id, failureStatus) => failureStatus.host}
+  .filter {case (host, executorIdToFailureStatus) =>
+executorIdToFailureStatus.size > maxBlackExecutorNumber}
+  .keys.toSet
+  }
+}
+
+/**
+ * This strategy is applied as default to keep the same semantics as 
original. It's an taskId
+ * related strategy. If an executor failed running "task A", then we think 
this executor is
+ * blacked for "task A". And we think the executor is still healthy for 
other 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45755581
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45755594
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+import scala.collection.mutable
+import org.apache.spark.SparkConf
+import org.apache.spark.Success
+import org.apache.spark.TaskEndReason
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Clock
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * BlacklistTracker is design to track problematic executors and node on 
application level.
+ * It is shared by all TaskSet, so that once a new TaskSet coming, it 
could be benefit from
+ * previous experience of other TaskSet.
+ *
+ * Once task finished, the callback method in TaskSetManager should update
+ * executorIdToFailureStatus Map.
+ */
+private[spark] class BlacklistTracker(
+sparkConf: SparkConf,
+clock: Clock = new SystemClock()) extends BlacklistCache{
+  // maintain a ExecutorId --> FailureStatus HashMap
+  private val executorIdToFailureStatus: mutable.HashMap[String, 
FailureStatus] = mutable.HashMap()
+
+  // Apply Strategy pattern here to change different blacklist detection 
logic
+  private val strategy = BlacklistStrategy(sparkConf)
+
+  // A daemon thread to expire blacklist executor periodically
+  private val scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+  "spark-scheduler-blacklist-expire-timer")
+
+  private val recoverPeriod = sparkConf.getTimeAsSeconds(
+"spark.scheduler.blacklist.recoverPeriod", "60s")
+
+  def start(): Unit = {
+val scheduleTask = new Runnable() {
+  override def run(): Unit = {
+Utils.logUncaughtExceptions(expireExecutorsInBlackList())
+  }
+}
+scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, 
TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+scheduler.shutdown()
+scheduler.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  // The actual implementation is delegated to strategy
+  /** VisibleForTesting */
+  private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized 
{
+val updated = 
strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
+if (updated) {
+  invalidAllCache()
+}
+  }
+
+  def updateFailureExecutors(
+  stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = 
synchronized {
+val index = info.index
+val atomTask = BlacklistAtomTask(stageId, index)
+reason match {
+  // If task succeeding, remove related record from 
executorIdToFailureStatus
+  case Success =>
+removeFailureExecutorsForTaskId(info.executorId, stageId, index)
+
+  // If task failing, update latest failure time and failedTaskIds
+  case _ =>
+val executorId = info.executorId
+executorIdToFailureStatus.get(executorId) match {
+  case Some(failureStatus) =>
+failureStatus.updatedTime = clock.getTimeMillis()
+val failedTimes = 
failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1
+failureStatus.numFailuresPerTask.update(atomTask, failedTimes)
+  case None =>
+val failedTasks = mutable.HashMap(atomTask -> 1)
+val failureStatus = new FailureStatus(
+  clock.getTimeMillis(), info.host, failedTasks)
+executorIdToFailureStatus.update(executorId, failureStatus)
+}
+invalidAllCache()
+}
+  }
+
+  /** remove the executorId from 

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45756446
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -34,6 +34,8 @@ import org.apache.spark.scheduler.SchedulingMode._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
+import com.google.common.annotations.VisibleForTesting
--- End diff --

nit: unused import now that we dont' use this annotation anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45759807
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskEndReason
+import org.apache.spark.Success
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.SparkContext
+import org.mockito.Mockito.{when, spy}
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with 
LocalSparkContext {
+
+  val FAILURE: TaskEndReason = new ExceptionFailure(
+  "Fake",
+  "fake failure",
+  Array.empty[StackTraceElement],
+  "fake stack trace",
+  None,
+  None)
+
+  val stage1 = 1
+  val stage2 = 2
+  // Variable name can indicate basic information of taskInfo
+  // The format is "taskInfo_executorId_taskIndex_hostName"
+  val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", 
TaskLocality.ANY, false)
+  val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", 
TaskLocality.ANY, false)
+
+  val clock = new ManualClock(0)
+
+  test ("expireExecutorsInBlacklist works") {
+// expire time is set to 6s
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+  .set("spark.scheduler.executorTaskBlacklistTime", "6000")
+
+sc = new SparkContext(conf)
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+// Executor 1 into blacklist at Time 00:00:00
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+
+clock.setTime(2000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1"))
+// Executor 1 failed again at Time 00::00:02
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+
+clock.setTime(3000)
+// Executor 2 failed at Time 00:00:03
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(6000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", 
"2"))
+
+clock.setTime(8000)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2"))
+
+clock.setTime(1)
+tracker.expireExecutorsInBlackList()
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+  }
+
+  test("blacklist feature is off by default") {
+val conf = new SparkConf().setAppName("test").setMaster("local")
+  .set("spark.ui.enabled", "false")
+sc = new SparkContext(conf)
+
+val scheduler = new TaskSchedulerImpl(sc, 1)
+
+val tracker = new BlacklistTracker(conf, clock)
+tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE)
+tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE)
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set())
+
+

[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45760745
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -21,10 +21,12 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
+import scala.collection.immutable.Set
 import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.collection.JavaConverters._
 
+
--- End diff --

nit: delete extra blank line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45751778
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
--- End diff --

nit: blank line between the `scala` and the `org.apache.spark` imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45751946
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
--- End diff --

nit: each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45752011
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
--- End diff --

each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45752836
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
--- End diff --

Reword to:
Choose which executors (if any) should be removed from the blacklist.  
Return true if any executors are removed from the blacklist, false otherwise.   
The default implementation removes exectors from the blacklist after 
[[expireTimeInMillis]].


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45752965
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy is simply based on given threshold and is taskId 
unrelated. An executor will be
+ * in blacklist, if it failed more than "maxFailureTaskNumber" times. A 
node will be in blacklist,
+ * if there are more than "maxBlackExecutorNumber" executors on it in 
executor blacklist.
+ *
+ * In this case, provided taskId will be ignored. The benefit for taskId 
unrelated strategy is that
+ * different taskSets can learn experience from other taskSet to avoid 
allocating tasks on
+ * problematic executors.
+ */
+private[scheduler] class SimpleStrategy(
+maxFailureTaskNumber: Int,
+maxBlackExecutorNumber: Int,
+val expireTimeInMilliseconds: Long
+  )extends BlacklistStrategy {
--- End diff --

nit: space after `)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...

2015-11-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/8760#discussion_r45753251
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Clock
+
+/**
+ * The interface to determine executor blacklist and node blacklist.
+ */
+private [scheduler] trait BlacklistStrategy {
+  /** Defined a time interval to expire failure information of executors */
+  val expireTimeInMilliseconds: Long
+
+  /** Return executors in blacklist which are related to given taskIndex */
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String]
+
+  /** Return all nodes in blacklist */
+  def getNodeBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
+
+  /**
+   * Default implementation to remove failure executors from HashMap based 
on given time period.
+   * The return value identity whether or not it updated anything
+   */
+  def expireExecutorsInBlackList(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], 
clock: Clock): Boolean = {
+val now = clock.getTimeMillis()
+val expiredKey = executorIdToFailureStatus.filter {
+  case (executorid, failureStatus) => {
+(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
+  }
+}.keySet
+
+if (expiredKey.isEmpty) {
+  false
+} else {
+  executorIdToFailureStatus --= expiredKey
+  true
+}
+  }
+}
+
+/**
+ * This strategy is simply based on given threshold and is taskId 
unrelated. An executor will be
+ * in blacklist, if it failed more than "maxFailureTaskNumber" times. A 
node will be in blacklist,
+ * if there are more than "maxBlackExecutorNumber" executors on it in 
executor blacklist.
+ *
+ * In this case, provided taskId will be ignored. The benefit for taskId 
unrelated strategy is that
+ * different taskSets can learn experience from other taskSet to avoid 
allocating tasks on
+ * problematic executors.
+ */
+private[scheduler] class SimpleStrategy(
+maxFailureTaskNumber: Int,
+maxBlackExecutorNumber: Int,
+val expireTimeInMilliseconds: Long
+  )extends BlacklistStrategy {
+
+  private def getSelectedExecutorMap(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = 
{
+executorIdToFailureStatus.filter{
+  case (id, failureStatus) => failureStatus.totalNumFailures > 
maxFailureTaskNumber
+}
+  }
+
+  // As this is a taskId unrelated strategy, the input taskId will be 
ignored
+  def getExecutorBlacklist(
+  executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
+  atomTask: BlacklistAtomTask, clock: Clock): Set[String] = {
--- End diff --

each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >