[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-220069649 Hi everyone, sorry this has sat around for so long, but this is a super important feature, so I'd like to finish up the missing pieces and get this in (with credit to @mwws still, of course). Other than just fixing merge conflicts, it seems there are two things which need to be addressed: (1) addressing performance concerns with some profiling and (2) investigating a lock-free implementation (think this can be done but need to look more carefully). I'll also build on the tests introduced here https://github.com/apache/spark/pull/8559. I'll open a WIP pr in the next day or two at least, but please let me know if I'm missing any major concern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-187586804 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51737/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-187586803 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-187586614 **[Test build #51737 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51737/consoleFull)** for PR 8760 at commit [`ecad5ff`](https://github.com/apache/spark/commit/ecad5ffe2bcc02f9b181a209172f22294e6af4ef). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-187548596 Besides from what @squito stated above, I have simplified the configuration to resolve @kayousterhout 's concern. By default, exact same blacklist logic will be applied as original. User can enable new blacklist logic by set `spark.scheduler.blacklist.advancedStrategy=true` as an experimental feature. Comparing with original logic, "Advanced Strategy" enable node blacklist: If more than 1 executors on same node are in blacklist for given stage, we put all executors on that node in blacklist for this stage. More details you can refer to comments and [unit test](https://github.com/mwws/spark/blob/SPARK-8426/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala). And the code has been rebased to latest master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-187547524 **[Test build #51737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51737/consoleFull)** for PR 8760 at commit [`ecad5ff`](https://github.com/apache/spark/commit/ecad5ffe2bcc02f9b181a209172f22294e6af4ef). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-185965488 @kayousterhout @markhamstra one other thing to follow up on was the earlier idea of using speculation to test out whether blacklisted executors can be added back. While I think its a neat _idea_, I really do not think it needs to be included here. First of all, speculation has always been somewhat poorly tested and its taken a while to root out some race conditions -- I dont' think users would want speculation turned on without explicitly requesting it, and I think we can still do useful blacklisting without requiring users to turn it on. Second, I think it could lead to lots of user confusion from the errors generated by these speculative tasks. Third, it would add a lot of complexity, and it can always be added as an option later on. I think the other concern is how this will affect scheduler throughput. I ran the spark-perf scheduler throughput tests (just locally), and didn't see any significant performance difference. However those tests aren't really stressing this part of the code, in particular there aren't failed executors, so its probably worth thinking of another test for this. The per-task overhead introduced with the lock-less approach will just be one call to `clock.getTimeMillis()`, which is pretty negligible, plus an occasional "blip" in the scheduling when the blacklists are updated. That would be non-zero but I also doubt it would be a problem, however some profiling is in order. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-185602366 @chenghao-intel Sure, I will update soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-185212009 @mwws can you update the code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r53165346 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Define a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given stage and partition */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: StageAndPartition, + clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Set[String] + + /** Return all nodes in blacklist for specified stage. By default it returns the same result as + * getNodeBlacklist. It could be override in strategy implementation. + */ + def getNodeBlacklistForStage( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + stageId: Int, + clock: Clock): Set[String] = getNodeBlacklist(executorIdToFailureStatus, clock) + + /** + * Choose which executors should be removed from blacklist. Return true if any executors are + * removed from the blacklist, false otherwise. The default implementation removes executors from + * the blacklist after [[expireTimeInMilliseconds]] + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy adds an executor to the blacklist for all tasks when the executor has too many + * task failures. An executor is placed in the blacklist when there are more than + * [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node are put into the + * blacklist if there are more than [[maxBlacklistedExecutors]] blacklisted executors on one node. + * The benefit of this strategy is that different taskSets can learn experience from other taskSet + * to avoid allocating tasks on problematic executors. + */ +private[scheduler] class ExecutorAndNodeStrategy( +maxFailedTasks: Int, +maxBlacklistedExecutors: Int, +val expireTimeInMilliseconds: Long + ) extends BlacklistStrategy { + + private def getExecutorBlacklistInfo( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = { +executorIdToFailureStatus.filter{ + case (id, failureStatus) => failureStatus.totalNumFailures > maxFailedTasks +} + } + + // As this is a task unrelated strategy, the input StageAndPartition info will be ignored + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: StageAndPartition, + clock: Clock): Set[String] = { +getExecutorBlacklistInfo(executorIdToFailureStatus).keys.toSet + } + + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Set[String] = { +
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r52685053 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidateCache() +} + } --- End diff -- this is a good point. effort was made to avoid doing too much work with this lock, by caching the set of blacklisted nodes and executors. But maybe we can do a bit better. The only reason we need to synchronize is b/c of the background thread that expires executors from the blacklist -- this is just called from a `TaskSetManger`, which in turn can only get called from threads with a lock on the `TaskScheduler`. So if instead of updating the cache in the background thread, we just have each of the methods check themselves if the blacklist needs to be updated, I think we could completely eliminate the need for the lock. You'd still occasionally be pausing scheduling to run updateFailedExecutors, but even with 1000s of executors this seems pretty minor, and it is not running very often (60s by default). We could avoid the overhead of synchronization for scheduling every task, however. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail:
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r52682469 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -21,6 +21,7 @@ import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern +import scala.collection.immutable.Set --- End diff -- this is very subjective, but I find it really confusing to have fully imported both immutable and mutable collections in the same file. (a) If you really want the immutable one, I'd just fully qualify it where you need to `collection.immutable.Set`. Or maybe (b) you could just use a mutable Set here for consistency? Or (c) if the code here doesn't care whether its mutable or immutable, you could take a `collection.Set`, and just fully qualify it where you use it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r52681082 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidateCache() +} + } + + def updateFailedExecutors( + stageId: Int, + partation: Int, + info: TaskInfo, + reason: TaskEndReason) : Unit = synchronized { + +val atomTask = StageAndPartition(stageId, partation) +reason match { + // If the task succeeded, remove related record from executorIdToFailureStatus + case Success => +removeFailedExecutorsForTaskId(info.executorId, stageId, partation) + + // If the task failed, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask(atomTask) = failedTimes + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus(executorId) = failureStatus +} +invalidateCache() +} + } + + /** remove the executorId from executorIdToFailureStatus */ + def
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r52681156 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -270,19 +260,11 @@ private[spark] class TaskSetManager( taskAttempts(taskIndex).exists(_.host == host) } - /** - * Is this re-execution of a failed task on an executor it already failed in before - * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? - */ - private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { -if (failedExecutors.contains(taskId)) { - val failed = failedExecutors.get(taskId).get - - return failed.contains(execId) && -clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT -} + var blacklistTracker = sched.sc.blacklistTracker --- End diff -- can be `private` I think --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r52681359 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.ExceptionFailure +import org.apache.spark.SparkConf +import org.apache.spark.SparkFunSuite +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with MockitoSugar { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None, + None) + + val stage1 = 1 + val stage2 = 2 + + val partation1 = 1 + val partation2 = 2 + val partation3 = 3 --- End diff -- typo: partition here and lots of other places --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-175342880 @mwws can you rebase the code? @kayousterhout any more comments on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-171210808 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49294/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-171210609 **[Test build #49294 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49294/consoleFull)** for PR 8760 at commit [`25a7c6f`](https://github.com/apache/spark/commit/25a7c6fc329687310f1ae88e56bcd72a5c259b61). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-171210806 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-171180163 **[Test build #49294 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49294/consoleFull)** for PR 8760 at commit [`25a7c6f`](https://github.com/apache/spark/commit/25a7c6fc329687310f1ae88e56bcd72a5c259b61). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-170442019 @squito @kayousterhout @mridulm do you have any further comments on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-170444236 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-170445768 **[Test build #49099 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49099/consoleFull)** for PR 8760 at commit [`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-170446175 **[Test build #49099 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49099/consoleFull)** for PR 8760 at commit [`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-170446186 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-170446190 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49099/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-167728825 **[Test build #48401 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48401/consoleFull)** for PR 8760 at commit [`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-167728914 @mridulm I have changed the interface a little bit and create `AdvancedSingleTaskStrategy` to support the use case you described above. With this new strategy, we enable node blacklist and scope of influence is limited in every stage. A unit test *AdvancedSingleTask strategy works* is added to show how it works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-167741271 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-167741274 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48401/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-167741137 **[Test build #48401 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48401/consoleFull)** for PR 8760 at commit [`d94bdd7`](https://github.com/apache/spark/commit/d94bdd7d16b77317bdfe5851e9ba297a229b4113). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-167052293 @squito @mridulm I am afraid an additional state can not help here, your use case requires "TaskSet related nodeBlacklist" which is not supported with current implementation. I am making change to support it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-165280794 @mridulm going back to one of your previous comments: > What would be more ideal is an implementation which allows for blacklisting single task (as is current), to multiple tasks for a taskset on a single executor (in case the taskset has issues on the executor - memory constraints perhaps) to blacklisting the node : time bound ofcourse, and for specific taskset and/or all taskset until timeout expires (directory cleanup catching up, etc). Did I miss this in there somewhere ? this isn't specifically included, but I think it could be added as an alternate strategy without a huge change. If I understand correctly, you're basically proposing adding another blacklisted state, where an executor is completely blacklisted, but only for *one taskset*, and after its blacklisted for multiple tasksets then its promoted to being blacklisted for all tasksets (still subject to some timeout or something)? I don't think that would be too hard but also trying to find the balance between adding too many knobs -- is there a clear use case for it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-165279831 @markhamstra I'm not entirely sure I understand the idea of de-blacklisting executors via speculation. The two seem rather independent, and I'm actually pretty interested in this feature even w/ speculation turned off. I don't really see how you'd leverage speculative tasks for this, it seems like it would complicate things a lot by tying them together, but maybe I just need the details spelled out more clearly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-165304077 @squito Unless an executor or node is dying/has resource issues, in most cases, the failure is not repeatable across tasksets due to differing execution/resource characterstics. So blanket blacklist can be extremely suboptimal. For example: taskset requiring disk usage (shuffle) vs taskset for executing collect() - the latter is not impacted by disk space being full/being reclaimed in background while former will keep failing. An additional state might most probably handle the requirement - will be able to comment more when I actually see the design ofcourse :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-161729752 I still like the idea of using speculative tasks to parole blacklisted executors and hosts, but that new role for speculative execution will complicate https://github.com/apache/spark/pull/8683. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-161100737 @kayousterhout this is pretty important for users running clusters with a larger clusters, eg. a few hundred nodes. We've seen cases where there is some weird misconfiguration on a node, which leads to all tasks failing on that node. The job will eventually succeed if you use the existing blacklist behavior, but with even one bad node you get so many task failure messages its very hard to make any sense of what is going on -- apps with thousands of tasks per stage and tens of stages generate tons of task failures. This leads to users complaining that things are broken, when its actually things are "ok" in the sense that they run successfully after completing, and also users being unable to find the "real" failures hidden in all the bogus msgs. Cluster managers don't help here -- the problem is when there is a gap between the manager's notion of a "healthy" node and the particular spark application (eg., one particular library fails to load, or bad memory or something). There is also an efficiency advantage to blacklisting the entire executor, rather than just letting tasks fail on your bad executors. In general the tasks will fail quickly and just get rescheduled (I've only actually seen one case where the tasks did not fail almost immediately), so maybe this is more minor, but seems like a nice thing to do right. But beyond just the initial task scheduling, this PR adds the ability for the cluster manager to avoid giving more executors on the bad node. Plus this opens the ability for spark to actively request a new executor -- eg., if you request 5 executors and one is bad, rather than running with 80% of your requested resources, you should just throw away that executor and get another one (if the cluster has capacity). though that is not directly included in this patch -- in earlier discussion @mwws thought it best to push off to follow up work. I also like the idea of minimizing user configuration, but also recognize that we are working from a position of some ignorance on the failure modes people encounter and how they want fixes. Eg., I imagine that everyone would want to use the `ExecutorAndNode` strategy and not change any other configuration, but perhaps for some users they want the old behavior and more knobs. That's also why I'm reluctant to change the default behavior (but not drawing a hard line either). After providing some options we could try to collect some more feedback on what works. You've got a good point about only blacklisting executors across jobs if the tasks succeed elsewhere. I was initially only thinking about a linear spark application -- in that case, if some tasks in a job fail everywhere, then the job will fail and the spark app is done in any case. But of course apps may try to recover from failed jobs, or even have completely independent jobs, in a "job server" style deployment. Another point you made on another pr is that we have to ensure that we don't prevent any progress from being made if all executors get blacklisted and there is nowhere for any tasks to run. We should make sure we have some sensible behavior there, eg., fail the job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-161132665 I had one other idea here: for blacklisted executors / hosts, we *could* periodically run a speculative copy of a task there. If the task succeeds, we can un-blacklist the executor, and if the task fails, we haven't lost anything, because the task is just a speculative copy, so there's another copy of the task running elsewhere. That would allow us to more aggressively re-try the executor, without sacrificing performance of any running jobs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-161133600 @kayousterhout Yeah, that's kind of interesting -- kind of like "arbeit macht frei", except it's not a lie. Probably shouldn't have gone there --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-161165662 @kayousterhout I believe in most of case user just can just use default configuration or set `spark.scheduler.blacklist.strategy` to `executorAndNode`. I also like idea about minimizing user configuration, but at the same time, it would better to provide extendability and flexibility in case the "smart" system can not satisfy all user requirements. About "running a speculative copy of a task", it might be doable but as I understand it add much complex to maintain correct status of TaskSet/Stage which the "speculative task" belows to. What if all other tasks has finished and the "speculative task" is still running? Should the speculative task be retried as normal tasks? What if "speculative task" succeeded but original task failed? Should next stage be aware of these "speculative tasks"? etc. And from system monitoring point of view, additional duplicated tasks are created which may confuse user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-161070513 Before fixing style issues on this change, I think it's worth considering whether this is the right approach for blacklisting. Based on discussion here and in #3541 and #10045, it seems like there are a few high-level goals here: (1) Minimize the required user configuration (ideally this should just work without any configuration) (2) If an executor/host isn't working well, stop using it for the particular job (3) If an executor/host is failing for multiple jobs, stop using it across all jobs (4) Eventually re-try the executor/host, since the failure may be transient (5) Don't overcorrect for bad tasks / jobs (that fail regardless of whether they're run) My concern with the approach in this PR is that it requires a lot of user configuration, and as has been discussed in the various PRs, the appropriate blacklist timeout is very workload-dependent and type-of-failure-dependent. I wonder if it would make sense to do something like an exponentially increasing timeout (where each consecutive failure triggers a longer timeout) to make this have a lower configuration overhead. I pinged @mateiz to see if he has any other ideas about how to do this gracefully. The other issue is (5). One way to handle that is to only "count" a task failure if the task fails on the executor *and* succeeds elsewhere. It would be great if we could make blacklisting work well out-of-the-box, so I think it's worth putting some thought into the right approach here. It would be useful to get others folks' feedback about whether these are the right goals and if there are better ideas for how to achieve them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r46249345 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidateCache() +} + } + + def updateFailedExecutors( + stageId: Int, + partation: Int, + info: TaskInfo, + reason: TaskEndReason) : Unit = synchronized { + +val atomTask = StageAndPartition(stageId, partation) +reason match { + // If the task succeeded, remove related record from executorIdToFailureStatus + case Success => +removeFailedExecutorsForTaskId(info.executorId, stageId, partation) + + // If the task failed, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask(atomTask) = failedTimes + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus(executorId) = failureStatus +} +invalidateCache() +} + } + + /** remove the executorId from executorIdToFailureStatus */ + def
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160890985 Since I am no longer very familiar with scheduler codebase, I am sort of giving up on my review halfway through - save the notes already added. Hopefully someone else more active can give a +1 ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160728616 @davies thanks -- part of what I was wondering is whether the existing blacklist mechanism is good enough, since many of the potential issues mentioned seem like things that would cause tasks to quickly fail, so I'm wondering how big the performance penalty is of making each task set learn on its own that a particular executor is bad. Curious to hear others weigh in here about whether this is a problem they're experiencing (@squito have you seen this issue in production?). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160707369 @davies is this the issue you were running into recently (Patrick mentioned you'd been experiencing a similar issue)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160709640 @kayousterhout I ran into a issue with bad host (a library can't be installed), it could be workarounded by specify the spark.scheduler.executorTaskBlacklistTime, unfortunately it's 0 by default (means disabled). I think we should increase the default value of spark.scheduler.executorTaskBlacklistTime in 1.6 at least. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160734431 I remembered why this discussion sounds so familiar: there was a long discussion about host-level blacklisting on this PR: https://github.com/apache/spark/pull/3541 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160814564 `spark.scheduler.executorTaskBlacklistTime` is renamed to `spark.scheduler.blacklist.timeout` in this PR to make blacklist related configuration following unified pattern spark.scheduler.blacklist.. By the way, the usage of this timeout configuration was missed in configuration doc before, I have added it back with new name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160864884 @kayousterhout It's an interesting discussion on PR: #3541. As I understand, this PR can satisfy both use case. For @davies , you might want to use `ExecutorAndNodeStrategy` by change `spark.scheduler.blacklist.strategy` configuration to enable host level blacklist. By default if 3 executors on the same host are blacklisted, we think the host should also be blacklisted. Also the threshold are configurable. For @mridulm , I tried to keep the same behavior as before in this PR, the default "SingleTaskStrategy" is what you want. It maintain an executor level blacklist for every TaskSet, and the timeout mechanism is retained. (Although `spark.scheduler.executorTaskBlacklistTime` is renamed to `spark.scheduler.blacklist.timeout`, but the old configuration name is also supported to keep backward compatibility. So nothing need to be changed from your side.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r46247212 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Define a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given stage and partition */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: StageAndPartition, + clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Choose which executors should be removed from blacklist. Return true if any executors are + * removed from the blacklist, false otherwise. The default implementation removes executors from + * the blacklist after [[expireTimeInMilliseconds]] + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy adds an executor to the blacklist for all tasks when the executor has too many + * task failures. An executor is placed in the blacklist when there are more than + * [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node are put into the + * blacklist if there are more than [[maxBlacklistedExecutors]] blacklisted executors on one node. + * The benefit of this strategy is that different taskSets can learn experience from other taskSet + * to avoid allocating tasks on problematic executors. + */ +private[scheduler] class ExecutorAndNodeStrategy( +maxFailedTasks: Int, +maxBlacklistedExecutors: Int, +val expireTimeInMilliseconds: Long + ) extends BlacklistStrategy { + + private def getSelectedExecutorMap( --- End diff -- This needs to be renamed to something more appropriate - filterFailedExecutors or some such - so that it is clear what it does. Selected Executor does not really convey what it is supposed to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r46248240 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidateCache() +} + } --- End diff -- @kayousterhout Would be good to have your thoughts here since I am fairly out of touch with scheduler now. If I read it correctly, multiple methods in this class are directly called from scheduler as part of identifying, starting, updating tasks to be run from TaskSetManager/scheduler and they all use the same lock as updateFailedExecutors - which can actually run fairly non trivial computation as number of failures increase in the system. Is this a concern or is it alleviated somehow and I missed it ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r46248217 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidateCache() +} + } + + def updateFailedExecutors( + stageId: Int, + partation: Int, --- End diff -- sp --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160884866 What would be more ideal is an implementation which allows for blacklisting single task (as is current), to multiple tasks for a taskset on a single executor (in case the taskset has issues on the executor - memory constraints perhaps) to blacklisting the node : time bound ofcourse, and for specific taskset and/or all taskset until timeout expires (directory cleanup catching up, etc). Did I miss this in there somewhere ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160490927 **[Test build #46861 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46861/consoleFull)** for PR 8760 at commit [`19e8cc0`](https://github.com/apache/spark/commit/19e8cc0fde56ab2055e44f5501534764d6dfcfe1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160505192 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/46861/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160505191 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160505134 **[Test build #46861 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46861/consoleFull)** for PR 8760 at commit [`19e8cc0`](https://github.com/apache/spark/commit/19e8cc0fde56ab2055e44f5501534764d6dfcfe1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160506398 This seems potentially useful, but adds a lot of complexity. Can you elaborate on the use cases where this problem has been arising? My vague memory of previous discussions about adding similar functionality is that the various cluster managers have existing functionality to fail workers that are repeatedly failing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160544588 @kayousterhout For example, task can failed because some host can't access source/network, local storage of some host is full, broken class linkage and other run-time exception. In such case, I think the worker will keep alive and cluster managers can't help too much. So that farther tasks will still submit to the problematic executors/nodes. Original code base already contain similar blacklist mechanism `executorIsBlacklisted`, but blacklist is maintained in every TaskSetManager (called `failedExecutors`) and not shared with each other, which means if a new TaskSet comes, it cannot benefit from previous experience of other TaskSet. This change adds complexity, but from user perspective, he can just use build-in `SingleNodeStrategy` which behave the same as original logic, or use build-in `ExecutorAndNodeStrategy` by only changing configuration. At the same time, it provides flexibility for advance user to customize it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160076073 **[Test build #46811 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46811/consoleFull)** for PR 8760 at commit [`fc3d38e`](https://github.com/apache/spark/commit/fc3d38e04ef99006ee4c20d20d52d9e871c2dfb8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160107616 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160107618 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/46811/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-160107546 **[Test build #46811 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46811/consoleFull)** for PR 8760 at commit [`fc3d38e`](https://github.com/apache/spark/commit/fc3d38e04ef99006ee4c20d20d52d9e871c2dfb8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user mwws commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r46021072 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -636,7 +622,8 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } -failedExecutors.remove(index) + +blacklistTracker.foreach(_.updateFailureExecutors(stageId, info, Success)) --- End diff -- Thanks for point it out, I will change it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45793155 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf +import org.apache.spark.TaskEndReason +import org.apache.spark.Success +import org.apache.spark.ExceptionFailure +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkContext +import org.mockito.Mockito.{when, spy} +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None, + None) + + val stage1 = 1 + val stage2 = 2 + // Variable name can indicate basic information of taskInfo + // The format is "taskInfo_executorId_taskIndex_hostName" + val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", TaskLocality.ANY, false) + + val clock = new ManualClock(0) + + test ("expireExecutorsInBlacklist works") { +// expire time is set to 6s +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "6000") + +sc = new SparkContext(conf) +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +// Executor 1 into blacklist at Time 00:00:00 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + +clock.setTime(2000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) +// Executor 1 failed again at Time 00::00:02 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) + +clock.setTime(3000) +// Executor 2 failed at Time 00:00:03 +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(6000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(8000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + +clock.setTime(1) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + } + + test("blacklist feature is off by default") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") +sc = new SparkContext(conf) + +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + +
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45792679 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf +import org.apache.spark.TaskEndReason +import org.apache.spark.Success +import org.apache.spark.ExceptionFailure +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkContext +import org.mockito.Mockito.{when, spy} +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None, + None) + + val stage1 = 1 + val stage2 = 2 + // Variable name can indicate basic information of taskInfo + // The format is "taskInfo_executorId_taskIndex_hostName" + val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", TaskLocality.ANY, false) + + val clock = new ManualClock(0) + + test ("expireExecutorsInBlacklist works") { +// expire time is set to 6s +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "6000") + +sc = new SparkContext(conf) +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +// Executor 1 into blacklist at Time 00:00:00 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + +clock.setTime(2000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) +// Executor 1 failed again at Time 00::00:02 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) + +clock.setTime(3000) +// Executor 2 failed at Time 00:00:03 +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(6000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(8000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + +clock.setTime(1) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + } + + test("blacklist feature is off by default") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") +sc = new SparkContext(conf) + +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + +
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45790406 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45790885 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) --- End diff -- nit: rather than calling `update`, its more idiomatic to write this as `failureStatus.numFailuresPerTask(atomTask) = failedTimes` (scala syntax-sugar for the `update` method) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45790903 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) --- End diff -- same here on `update` --- If your project is set up for it, you can reply to this email and
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/8760#issuecomment-159402366 sorry for the delay again, @mwws, thanks for updating. My comments are now just cosmetic. As its a scheduler change, pinging the usual folks: @markhamstra @kayousterhout @mateiz --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45791721 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy is simply based on given threshold and is taskId unrelated. An executor will be + * in blacklist, if it failed more than "maxFailureTaskNumber" times. A node will be in blacklist, + * if there are more than "maxBlackExecutorNumber" executors on it in executor blacklist. + * + * In this case, provided taskId will be ignored. The benefit for taskId unrelated strategy is that + * different taskSets can learn experience from other taskSet to avoid allocating tasks on + * problematic executors. + */ +private[scheduler] class SimpleStrategy( +maxFailureTaskNumber: Int, +maxBlackExecutorNumber: Int, --- End diff -- can you change these params to `maxFailedTasks` and `maxBlacklistedExecutors`, and change the comment to: This strategy adds an executor to the blacklist for *all* tasks when the executor has too many task failures. An executor is placed in the blacklist when there are more than [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node are put into the blacklist if there are more than [[maxBlacklistedExecutors]] blacklisted executors on one node. The benefit of this strategy is that different taskSets can learn experience from other taskSet to avoid allocating tasks on problematic executors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45791494 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy is simply based on given threshold and is taskId unrelated. An executor will be + * in blacklist, if it failed more than "maxFailureTaskNumber" times. A node will be in blacklist, + * if there are more than "maxBlackExecutorNumber" executors on it in executor blacklist. + * + * In this case, provided taskId will be ignored. The benefit for taskId unrelated strategy is that + * different taskSets can learn experience from other taskSet to avoid allocating tasks on + * problematic executors. + */ +private[scheduler] class SimpleStrategy( +maxFailureTaskNumber: Int, +maxBlackExecutorNumber: Int, --- End diff -- can you change these params to `maxFailedTasks` and `maxBlacklistedExecutors`, and change the comment to: This strategy adds an executor to the blacklist for *all* tasks when the executor has too many task failures. An executor is placed in the blacklist when there are more than [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node are put into the blacklist if there are more than [[maxBlacklistedExecutors]] blacklisted executors on one node. The benefit of this strategy is that different taskSets can learn experience from other taskSet to avoid allocating tasks on problematic executors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45790035 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45788633 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy is simply based on given threshold and is taskId unrelated. An executor will be + * in blacklist, if it failed more than "maxFailureTaskNumber" times. A node will be in blacklist, + * if there are more than "maxBlackExecutorNumber" executors on it in executor blacklist. + * + * In this case, provided taskId will be ignored. The benefit for taskId unrelated strategy is that + * different taskSets can learn experience from other taskSet to avoid allocating tasks on + * problematic executors. + */ +private[scheduler] class SimpleStrategy( +maxFailureTaskNumber: Int, +maxBlackExecutorNumber: Int, +val expireTimeInMilliseconds: Long + )extends BlacklistStrategy { + + private def getSelectedExecutorMap( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = { +executorIdToFailureStatus.filter{ + case (id, failureStatus) => failureStatus.totalNumFailures > maxFailureTaskNumber +} + } + + // As this is a taskId unrelated strategy, the input taskId will be ignored + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] = { +getSelectedExecutorMap(executorIdToFailureStatus).keys.toSet + } + + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] = { +getSelectedExecutorMap(executorIdToFailureStatus) + .groupBy{case (id, failureStatus) => failureStatus.host} + .filter {case (host, executorIdToFailureStatus) => +executorIdToFailureStatus.size > maxBlackExecutorNumber} + .keys.toSet + } +} + +/** + * This strategy is applied as default to keep the same semantics as original. It's an taskId + * related strategy. If an executor failed running "task A", then we think this executor is + * blacked for "task A". And we think the executor is still healthy for other
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45792961 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf +import org.apache.spark.TaskEndReason +import org.apache.spark.Success +import org.apache.spark.ExceptionFailure +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkContext +import org.mockito.Mockito.{when, spy} +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None, + None) + + val stage1 = 1 + val stage2 = 2 + // Variable name can indicate basic information of taskInfo + // The format is "taskInfo_executorId_taskIndex_hostName" + val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", TaskLocality.ANY, false) + + val clock = new ManualClock(0) + + test ("expireExecutorsInBlacklist works") { +// expire time is set to 6s +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "6000") + +sc = new SparkContext(conf) +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +// Executor 1 into blacklist at Time 00:00:00 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + +clock.setTime(2000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) +// Executor 1 failed again at Time 00::00:02 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) + +clock.setTime(3000) +// Executor 2 failed at Time 00:00:03 +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(6000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(8000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + +clock.setTime(1) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + } + + test("blacklist feature is off by default") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") +sc = new SparkContext(conf) + +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + +
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45769593 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds --- End diff -- If the task failed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45769579 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus --- End diff -- If the task succeeded --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45755713 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756928 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756966 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting --- End diff -- actually remove the google import, not used anymore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45761465 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45761512 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( --- End diff -- `updateFailedExecutors` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756368 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -636,7 +622,8 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } -failedExecutors.remove(index) + +blacklistTracker.foreach(_.updateFailureExecutors(stageId, info, Success)) --- End diff -- there is one really annoying detail here. If there are multiple attempts for one stage (which happens on an executor failure), `info.index` is not referencing the same thing between different `TaskSet`s. Attempt 2 might only need to run a subset of tasks, in which case index 7 could be partition 58. I think what you want here is actually `tasks(index).partition`, which will refer to the same logic task across all attempts of a stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756677 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756736 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45759735 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf +import org.apache.spark.TaskEndReason +import org.apache.spark.Success +import org.apache.spark.ExceptionFailure +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkContext +import org.mockito.Mockito.{when, spy} +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None, + None) + + val stage1 = 1 + val stage2 = 2 + // Variable name can indicate basic information of taskInfo + // The format is "taskInfo_executorId_taskIndex_hostName" + val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", TaskLocality.ANY, false) + + val clock = new ManualClock(0) + + test ("expireExecutorsInBlacklist works") { +// expire time is set to 6s +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "6000") + +sc = new SparkContext(conf) +val scheduler = new TaskSchedulerImpl(sc, 1) --- End diff -- you can avoid creating a `SparkContext` in all of these tests with mockito. add the `MockitoSugar` trait and then you can do: ```scala val scheduler = mock[TaskSchedulerImpl] when(scheduler.getExecutorsAliveOnHost(any[String])).thenReturn(Some(Set[String]())) ``` (or whatever else you you want that method to return). Then you don't need to mixin `LocalSparkContext`. it'll speed up the tests a bit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756882 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45757164 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf +import org.apache.spark.TaskEndReason +import org.apache.spark.Success +import org.apache.spark.ExceptionFailure +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkContext +import org.mockito.Mockito.{when, spy} +import org.apache.spark.util.ManualClock --- End diff -- nit: import grouping and ordering --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45771875 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy is simply based on given threshold and is taskId unrelated. An executor will be + * in blacklist, if it failed more than "maxFailureTaskNumber" times. A node will be in blacklist, + * if there are more than "maxBlackExecutorNumber" executors on it in executor blacklist. + * + * In this case, provided taskId will be ignored. The benefit for taskId unrelated strategy is that + * different taskSets can learn experience from other taskSet to avoid allocating tasks on + * problematic executors. + */ +private[scheduler] class SimpleStrategy( +maxFailureTaskNumber: Int, +maxBlackExecutorNumber: Int, +val expireTimeInMilliseconds: Long + )extends BlacklistStrategy { + + private def getSelectedExecutorMap( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = { +executorIdToFailureStatus.filter{ + case (id, failureStatus) => failureStatus.totalNumFailures > maxFailureTaskNumber +} + } + + // As this is a taskId unrelated strategy, the input taskId will be ignored + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] = { +getSelectedExecutorMap(executorIdToFailureStatus).keys.toSet + } + + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] = { +getSelectedExecutorMap(executorIdToFailureStatus) + .groupBy{case (id, failureStatus) => failureStatus.host} + .filter {case (host, executorIdToFailureStatus) => +executorIdToFailureStatus.size > maxBlackExecutorNumber} + .keys.toSet + } +} + +/** + * This strategy is applied as default to keep the same semantics as original. It's an taskId + * related strategy. If an executor failed running "task A", then we think this executor is + * blacked for "task A". And we think the executor is still healthy for other
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45755581 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45755594 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import org.apache.spark.SparkConf +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils +import org.apache.spark.util.Clock + +import com.google.common.annotations.VisibleForTesting + +/** + * BlacklistTracker is design to track problematic executors and node on application level. + * It is shared by all TaskSet, so that once a new TaskSet coming, it could be benefit from + * previous experience of other TaskSet. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( +sparkConf: SparkConf, +clock: Clock = new SystemClock()) extends BlacklistCache{ + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( +"spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { +val scheduleTask = new Runnable() { + override def run(): Unit = { +Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } +} +scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { +scheduler.shutdown() +scheduler.awaitTermination(10, TimeUnit.SECONDS) + } + + // The actual implementation is delegated to strategy + /** VisibleForTesting */ + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { +val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) +if (updated) { + invalidAllCache() +} + } + + def updateFailureExecutors( + stageId: Int, info: TaskInfo, reason: TaskEndReason) : Unit = synchronized { +val index = info.index +val atomTask = BlacklistAtomTask(stageId, index) +reason match { + // If task succeeding, remove related record from executorIdToFailureStatus + case Success => +removeFailureExecutorsForTaskId(info.executorId, stageId, index) + + // If task failing, update latest failure time and failedTaskIds + case _ => +val executorId = info.executorId +executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => +failureStatus.updatedTime = clock.getTimeMillis() +val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 +failureStatus.numFailuresPerTask.update(atomTask, failedTimes) + case None => +val failedTasks = mutable.HashMap(atomTask -> 1) +val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) +executorIdToFailureStatus.update(executorId, failureStatus) +} +invalidAllCache() +} + } + + /** remove the executorId from
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45756446 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -34,6 +34,8 @@ import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.TaskState.TaskState import org.apache.spark.util.{Clock, SystemClock, Utils} +import com.google.common.annotations.VisibleForTesting --- End diff -- nit: unused import now that we dont' use this annotation anymore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45759807 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf +import org.apache.spark.TaskEndReason +import org.apache.spark.Success +import org.apache.spark.ExceptionFailure +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkContext +import org.mockito.Mockito.{when, spy} +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None, + None) + + val stage1 = 1 + val stage2 = 2 + // Variable name can indicate basic information of taskInfo + // The format is "taskInfo_executorId_taskIndex_hostName" + val taskInfo_1_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_1_2_hostA = new TaskInfo(2L, 2, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_2_1_hostA = new TaskInfo(3L, 1, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_2_2_hostA = new TaskInfo(4L, 2, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_3_3_hostB = new TaskInfo(5L, 3, 1, 0L, "3", "hostB", TaskLocality.ANY, false) + + val clock = new ManualClock(0) + + test ("expireExecutorsInBlacklist works") { +// expire time is set to 6s +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "6000") + +sc = new SparkContext(conf) +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +// Executor 1 into blacklist at Time 00:00:00 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + +clock.setTime(2000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) +// Executor 1 failed again at Time 00::00:02 +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) + +clock.setTime(3000) +// Executor 2 failed at Time 00:00:03 +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(6000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + +clock.setTime(8000) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + +clock.setTime(1) +tracker.expireExecutorsInBlackList() +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + } + + test("blacklist feature is off by default") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") +sc = new SparkContext(conf) + +val scheduler = new TaskSchedulerImpl(sc, 1) + +val tracker = new BlacklistTracker(conf, clock) +tracker.updateFailureExecutors(stage1, taskInfo_1_1_hostA, FAILURE) +tracker.updateFailureExecutors(stage1, taskInfo_2_1_hostA, FAILURE) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) +assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + +
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45760745 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -21,10 +21,12 @@ import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern +import scala.collection.immutable.Set import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.JavaConverters._ + --- End diff -- nit: delete extra blank line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45751778 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging --- End diff -- nit: blank line between the `scala` and the `org.apache.spark` imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45751946 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] --- End diff -- nit: each arg on its own line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45752011 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { --- End diff -- each arg on its own line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45752836 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything --- End diff -- Reword to: Choose which executors (if any) should be removed from the blacklist. Return true if any executors are removed from the blacklist, false otherwise. The default implementation removes exectors from the blacklist after [[expireTimeInMillis]]. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45752965 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy is simply based on given threshold and is taskId unrelated. An executor will be + * in blacklist, if it failed more than "maxFailureTaskNumber" times. A node will be in blacklist, + * if there are more than "maxBlackExecutorNumber" executors on it in executor blacklist. + * + * In this case, provided taskId will be ignored. The benefit for taskId unrelated strategy is that + * different taskSets can learn experience from other taskSet to avoid allocating tasks on + * problematic executors. + */ +private[scheduler] class SimpleStrategy( +maxFailureTaskNumber: Int, +maxBlackExecutorNumber: Int, +val expireTimeInMilliseconds: Long + )extends BlacklistStrategy { --- End diff -- nit: space after `)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-8426] [scheduler] enhance blacklist mec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/8760#discussion_r45753251 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.util.SystemClock +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + */ +private [scheduler] trait BlacklistStrategy { + /** Defined a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given taskIndex */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): Set[String] + + /** + * Default implementation to remove failure executors from HashMap based on given time period. + * The return value identity whether or not it updated anything + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], clock: Clock): Boolean = { +val now = clock.getTimeMillis() +val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => { +(now - failureStatus.updatedTime) >= expireTimeInMilliseconds + } +}.keySet + +if (expiredKey.isEmpty) { + false +} else { + executorIdToFailureStatus --= expiredKey + true +} + } +} + +/** + * This strategy is simply based on given threshold and is taskId unrelated. An executor will be + * in blacklist, if it failed more than "maxFailureTaskNumber" times. A node will be in blacklist, + * if there are more than "maxBlackExecutorNumber" executors on it in executor blacklist. + * + * In this case, provided taskId will be ignored. The benefit for taskId unrelated strategy is that + * different taskSets can learn experience from other taskSet to avoid allocating tasks on + * problematic executors. + */ +private[scheduler] class SimpleStrategy( +maxFailureTaskNumber: Int, +maxBlackExecutorNumber: Int, +val expireTimeInMilliseconds: Long + )extends BlacklistStrategy { + + private def getSelectedExecutorMap( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]) = { +executorIdToFailureStatus.filter{ + case (id, failureStatus) => failureStatus.totalNumFailures > maxFailureTaskNumber +} + } + + // As this is a taskId unrelated strategy, the input taskId will be ignored + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: BlacklistAtomTask, clock: Clock): Set[String] = { --- End diff -- each arg on its own line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org