[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r84799861 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -282,6 +331,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("scheduled tasks obey task and stage blacklists") { --- End diff -- Could you stick this in its own PR? This one is so huge that it would be nice to minimize it as much as possible (and the test should be mergable quickly) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r84148084 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -282,6 +331,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("scheduled tasks obey task and stage blacklists") { --- End diff -- this test case really should have been part of the last PR, oops. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r84149709 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -17,10 +17,254 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private val executorIdToFailureList = new HashMap[String, ExecutorFailureList]() + val executorIdToBlacklistStatus = new HashMap[String, BlacklistedExecutor]() + val nodeIdToBlacklistExpiryTime = new HashMap[String, Long]() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist = new AtomicReference[Set[String]](Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more + * executors on that node, and we periodically clean up the list of blacklisted executors. + */ + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() --- End diff -- The comment about this not growing too large isn't totally correct. With this PR, we only prevent adding executors on blacklisted nodes in yarn mode. Standalone and mesos could keep allocating executors on the bad node. This is particularly a problem with dynamic allocation. Given the default dynamic allocation timeout of 1 minute, and the blacklist timeout of 1 hour, this means that during that hour, you'll repeatedly spin up executors, blacklist them, and then kill them due to the 1 minute timeout. Not only do we build up `60 * numExecutorsPerNode` entries in the inner hashset during that hour -- we also won't ever clean it up, since we only do that when all the ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79936008 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val executorE
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79934546 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val executorE
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79934114 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { --- End diff -- yeah good point, I agree --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79933967 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val executorE
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79933045 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val executorE
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79920803 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) --- End diff -- good point, sorry about that. In this case a search and replace is easy enough so I'll go ahead and update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79916381 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,34 +610,59 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { - -val pendingTask: Option[Int] = { - // usually this will just take the last pending task, but because of the lazy removal - // from each list, we may need to go deeper in the list. We poll from the end because - // failed tasks are put back at the end of allPendingTasks, so we're more likely to find - // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => -copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) - } - if (indexOffset == -1) { -None - } else { -Some(allPendingTasks(indexOffset)) - } -} + private[scheduler] def abortIfCompletelyBlacklisted( + hostToExecutors: HashMap[String, HashSet[String]]): Unit = { +blacklistTracker.foreach { blacklist => + // because this is called in a loop, with multiple resource offers and locality levels, --- End diff -- good point, it is not --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79916404 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,34 +610,59 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { - -val pendingTask: Option[Int] = { - // usually this will just take the last pending task, but because of the lazy removal - // from each list, we may need to go deeper in the list. We poll from the end because - // failed tasks are put back at the end of allPendingTasks, so we're more likely to find - // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => -copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) - } - if (indexOffset == -1) { -None - } else { -Some(allPendingTasks(indexOffset)) - } -} + private[scheduler] def abortIfCompletelyBlacklisted( + hostToExecutors: HashMap[String, HashSet[String]]): Unit = { +blacklistTracker.foreach { blacklist => + // because this is called in a loop, with multiple resource offers and locality levels, + // we could end up aborting this taskset multiple times without the !isZombie check + if (!isZombie) { +// take any task that needs to be scheduled, and see if we can find some executor it *could* +// run on +val pendingTask: Option[Int] = { + // usually this will just take the last pending task, but because of the lazy removal + // from each list, we may need to go deeper in the list. We poll from the end because + // failed tasks are put back at the end of allPendingTasks, so we're more likely to find + // an unschedulable task this way. + val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => +copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) + } + if (indexOffset == -1) { +None + } else { +Some(allPendingTasks(indexOffset)) + } +} -// If no executors have registered yet, don't abort the stage, just wait. We probably -// got here because a task set was added before the executors registered. -if (executors.nonEmpty) { - // take any task that needs to be scheduled, and see if we can find some executor it *could* - // run on - pendingTask.foreach { taskId => -if (executors.forall(executorIsBlacklisted(_, taskId))) { - val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")") - val partition = tasks(taskId).partitionId - abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" + -s" has already failed on executors $execs, and no other executors are available.") +// If no executors have registered yet, don't abort the stage, just wait. We probably +// got here because a task set was added before the executors registered. +if (hostToExecutors.nonEmpty) { --- End diff -- good point --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79914706 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -50,22 +49,26 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} *task set will be aborted */ private[spark] class TaskSetManager( -sched: TaskSchedulerImpl, +val sched: TaskSchedulerImpl, +val blacklistTracker: Option[BlacklistTracker], val taskSet: TaskSet, val maxTaskFailures: Int, -clock: Clock = new SystemClock()) +val clock: Clock) extends Schedulable with Logging { - val conf = sched.sc.conf + def this( + sched: TaskSchedulerImpl, + taskSet: TaskSet, + maxTaskFailures: Int, + clock: Clock = new SystemClock()) { +this(sched, None, taskSet, maxTaskFailures, clock) --- End diff -- only a really minor reason -- multiple default params don't work as nicely, all the call sites which only specify a `clock` now have to say `clock = clock`. (while with multiple constructors, the types are enough to figure out what to do.) but this doesn't really matter, I'll update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79912771 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -51,13 +50,27 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl( +private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, +private val blacklistTracker: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) + def this(sc: SparkContext) = { +this( + sc, + sc.conf.getInt("spark.task.maxFailures", 4), + TaskSchedulerImpl.createBlacklistTracker(sc.conf)) + } + + def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { +this( + sc, + maxTaskFailures, + TaskSchedulerImpl.createBlacklistTracker(sc.conf), --- End diff -- you can't have a default param which depends on another param (the `sc` in that call). If you really want, I could make it required, and change all the call sites. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79912594 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79911314 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79908638 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r79869652 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -428,7 +456,7 @@ private[spark] class TaskSchedulerImpl( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, - reason: TaskEndReason): Unit = synchronized { + reason: TaskFailedReason): Unit = synchronized { --- End diff -- Sure, https://github.com/apache/spark/pull/15181 / https://issues.apache.org/jira/browse/SPARK-17623 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77893957 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala --- @@ -51,37 +54,68 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM assertDataStructuresEmpty(noFailure = false) } - // even with the blacklist turned on, if maxTaskFailures is not more than the number - // of executors on the bad node, then locality preferences will lead to us cycling through - // the executors on the bad node, and still failing the job + // even with the blacklist turned on, bad configs can lead to job failure. To survive one + // bad node, you need to make sure that + // maxTaskFailures > min(spark.blacklist.maxTaskFailuresPerNode, nExecutorsPerHost) testScheduler( "With blacklist on, job will still fail if there are too many bad executors on bad host", extraConfs = Seq( - // set this to something much longer than the test duration so that executors don't get - // removed from the blacklist during the test - ("spark.scheduler.executorTaskBlacklistTime", "1000") + config.BLACKLIST_ENABLED.key -> "true", + config.MAX_TASK_ATTEMPTS_PER_NODE.key -> "5", + "spark.task.maxFailures" -> "4", + "spark.testing.nHosts" -> "2", + "spark.testing.nExecutorsPerHost" -> "5", + "spark.testing.nCoresPerExecutor" -> "10", + // Blacklisting will normally immediately complain that this config is invalid -- the point + // of this test is to expose that the configuration is unsafe, so skip the validation. + "spark.blacklist.testing.skipValidation" -> "true" ) ) { -val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) +// to reliably reproduce the failure, we have to use 1 task. That way, we ensure this +// 1 task gets rotated through enough bad executors on the host to fail the taskSet, +// before we have a bunch of different tasks fail in the executors so we blacklist them. +// But the point here is -- we never try scheduling tasks on the good host-1, since we +// hit too many failures trying our preferred host-0. +val rdd = new MockRDDWithLocalityPrefs(sc, 1, Nil, badHost) withBackend(badHostBackend _) { - val jobFuture = submit(rdd, (0 until 10).toArray) + val jobFuture = submit(rdd, (0 until 1).toArray) awaitJobTermination(jobFuture, duration) } assertDataStructuresEmpty(noFailure = false) } - // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually - // schedule on a good node and succeed the job + --- End diff -- nit: extra empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77893351 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala --- @@ -51,37 +54,68 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM assertDataStructuresEmpty(noFailure = false) } - // even with the blacklist turned on, if maxTaskFailures is not more than the number - // of executors on the bad node, then locality preferences will lead to us cycling through - // the executors on the bad node, and still failing the job + // even with the blacklist turned on, bad configs can lead to job failure. To survive one + // bad node, you need to make sure that + // maxTaskFailures > min(spark.blacklist.maxTaskFailuresPerNode, nExecutorsPerHost) --- End diff -- comment out of date spark.blacklist.maxTaskFailuresPerNode doesn't exist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77892068 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77891521 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -90,6 +92,54 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") + .booleanConf + .createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") + .intConf + .createWithDefault(1) + + private[spark] val MAX_TASK_ATTEMPTS_PER_NODE = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode") --- End diff -- Ah I see that makes sense. I think the ".tasks" prefix in the config name was supposed to add a clue that it's per-task, but I agree with your sentiment that if we just did spark.blacklist.task.maxFailureTasksPerNode, it wouldn't be clear that it's a per-task threshold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77890828 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77888237 --- Diff: docs/configuration.md --- @@ -1209,6 +1209,80 @@ Apart from these, the following properties are also available, and may be useful + spark.blacklist.enabled + +true in cluster mode; +false in local mode + + +If set to "true", prevent Spark from scheduling tasks on executors that have been blacklisted +due to too many task failures. The blacklisting algorithm can be further controlled by the +other "spark.blacklist" configuration options. + + + + spark.blacklist.timeout + 1h + +(Experimental) How long a node or executor is blacklisted for the entire application, before it +is unconditionally removed from the blacklist to attempt running new tasks. + + + + spark.blacklist.task.maxTaskAttemptsPerExecutor + 1 + +(Experimental) For a given task, how many times it can be retried on one executor before the +executor is blacklisted for that task. + + + + spark.blacklist.task.maxTaskAttemptsPerNode + 2 + +(Experimental) For a given task, how many times it can be retried on one node, before the entire +node is blacklisted for that task. + + + + spark.blacklist.stage.maxFailedTasksPerExecutor + 2 + +(Experimental) How many different tasks must fail on one executor, within one stage, before the +executor is blacklisted for that stage. + + + + spark.blacklist.stage.maxFailedExecutorsPerNode + 2 + +(Experimental) How many different executors are marked as failed for a given stage, before the +entire node is marked as failed for the stage. --- End diff -- should be marked as blacklisted for the stage --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77887355 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77879549 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -90,6 +92,54 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") + .booleanConf + .createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") + .intConf + .createWithDefault(1) + + private[spark] val MAX_TASK_ATTEMPTS_PER_NODE = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode") --- End diff -- The difference is that the task attempt is the same task failing multiple attempts vs maxFailedTasks is the number any failed tasks (ie different tasks) and I think it was just trying to differentiate the 2 I don't remember if we discussed actual naming in the design doc, I think he was just trying to differentiate the 2. We probably could rename to something like maxFailedTaskAttempts but that is pretty close to maxFailedTasks so not sure if that would be more confusing. @squito can correct me if I'm wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77298682 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77298211 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77298118 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297969 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297902 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297807 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297662 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297537 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { --- End diff -- I wonder if this should be a test for the TaskSetManager (so in TaskSetManagerSuite), where you pass in a mock'ed blacklist tracker, and make sure that no calls get made on that blacklist tracker? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297372 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297304 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297192 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77297069 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296679 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296487 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296415 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296312 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296176 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) +assert( + tsm.isExecutorBlacklistedForTask(executor, index) === taskExp) +val ex
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296120 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) +val taskExp = (badExec && badPart) --- End diff -- expectTaskBlacklisted? (this is too short to guess the meaning). Or just in-line it belo
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296072 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) +tsm.updateBlacklistForFailedTask("hostA", "2", 0) +tsm.updateBlacklistForFailedTask("hostA", "2", 1) +// we don't explicitly return the executors in hostA here, but that is OK +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + withClue(s"exec = $executor; index = $index") { +val badExec = (executor == "1" || executor == "2") +val badPart = (index == 0 || index == 1) --- End diff -- badIndex? badTaskIndex? (I'm guessing this is leftover from the old partition naming) --- If your project is set up for it, you can re
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77295971 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, +// for all tasks for the stage. Note the api expects multiple checks for each type of +// blacklist -- this actually fits naturally with its use in the scheduler +tsm.updateBlacklistForFailedTask("hostA", "1", 1) --- End diff -- Don't bother fixing all of these, but in the future, I find tests like this much easier to read if you pass in the parameters as named parameters, so it's obvious what everything means (e.g., updateBlacklistForFailedTask("hostA", exec = "1", index = 1)) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- -
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77296004 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet + + /** + * Its easier to write our tests as if we could directly look at the sets of nodes & executors in + * the blacklist. However the api doesn't expose a set (for thread-safety), so this is a simple + * way to test something similar, since we know the universe of values that might appear in these + * sets. + */ + def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = { +allOptions.foreach { opt => + val actual = f(opt) + val exp = expected.contains(opt) + assert(actual === exp, raw"""for string "$opt" """) +} + } + + def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = { +sc = new SparkContext(conf) +val scheduler = mock[TaskSchedulerImpl] +when(scheduler.sc).thenReturn(sc) + when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker) +scheduler + } + + test("Blacklisting individual tasks") { +val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +val scheduler = mockTaskSchedWithConf(conf) +// Task 1 failed on executor 1 +blacklistTracker = new BlacklistTracker(conf, clock) +val taskSet = FakeTask.createTaskSet(10) +val tsm = new TaskSetManager(scheduler, Some(blacklistTracker), taskSet, 4, clock) +tsm.updateBlacklistForFailedTask("hostA", "1", 0) +for { + executor <- (1 to 4).map(_.toString) + index <- 0 until 10 +} { + val exp = (executor == "1" && index == 0) + assert(tsm.isExecutorBlacklistedForTask(executor, index) === exp) +} +assert(blacklistTracker.nodeBlacklist() === Set()) +assertEquivalentToSet(blacklistTracker.isNodeBlacklisted(_), Set()) +assertEquivalentToSet(tsm.isNodeBlacklistedForTaskSet, Set()) +assertEquivalentToSet(tsm.isExecutorBlacklistedForTaskSet, Set()) + +// Task 1 & 2 failed on both executor 1 & 2, so we blacklist all executors on that host, --- End diff -- can you add "should" before "blacklist" (so it's clear what the test is doing versus what the test is verifying) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77295487 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar +with LocalSparkContext { + + private val clock = new ManualClock(0) + + private var blacklistTracker: BlacklistTracker = _ + + override def afterEach(): Unit = { +if (blacklistTracker != null) { + blacklistTracker = null +} +super.afterEach() + } + + val allOptions = (('A' to 'Z').map("host" + _) ++ (1 to 100).map{_.toString}).toSet --- End diff -- allExecutorAndHostIds? allExecutorAndHostNames? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77279586 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -800,6 +842,86 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } + private[scheduler] def updateBlacklistForFailedTask( + host: String, + exec: String, + index: Int): Unit = { +val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host)) +execFailures.updateWithFailure(index, clock.getTimeMillis() + + blacklistTracker.get.BLACKLIST_TIMEOUT_MILLIS) + +// check if this task has also failed on other executors on the same host -- if its gone +// over the limit, blacklist it from the entire host +val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet()) +execsWithFailuresOnNode += exec +val failuresOnHost = execsWithFailuresOnNode.toIterator.map { exec => + execToFailures.get(exec).map { failures => +// We count task attempts here, not the number of unique executors with failures. This is +// because jobs are aborted based on the number task attempts; if we counted unique +// executors, it would be hard to config to ensure that you try another +// node before hitting the max number of task failures. +failures.taskToFailureCountAndExpiryTime.getOrElse(index, (0, 0))._1 + }.getOrElse(0) +}.sum +if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) { + nodeToBlacklistedTasks.getOrElseUpdate(host, new HashSet()) += index +} + +if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) { + if (blacklistedExecs.add(exec)) { +logInfo(s"Blacklisting executor ${exec} for stage $stageId") +// This executor has been pushed into the blacklist for this stage. Let's check if it +// pushes the whole node into the blacklist. +val blacklistedExecutorsOnNode = + execsWithFailuresOnNode.filter(blacklistedExecs.contains(_)) +if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) { + if (blacklistedNodes.add(host)) { +logInfo(s"Blacklisting ${host} for stage $stageId") + } +} + } +} + } + + /** + * Return true if this executor is blacklisted for the given task. This does *not* + * need to return true if the executor is blacklisted for the entire stage, or blacklisted + * altogether. That is to keep this method as fast as possible in the inner-loop of the + * scheduler, where those filters will have already been applied. + */ + def isExecutorBlacklistedForTask( + executorId: String, + index: Int): Boolean = { +execToFailures.get(executorId) + .map { execFailures => +val count = execFailures.taskToFailureCountAndExpiryTime.getOrElse(index, (0, 0))._1 --- End diff -- i would find this slightly more intuitive as ...AndExpiryTime.get(index).map(_._1).getOrElse(0) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77279202 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -800,6 +842,86 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } + private[scheduler] def updateBlacklistForFailedTask( + host: String, + exec: String, + index: Int): Unit = { +val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host)) +execFailures.updateWithFailure(index, clock.getTimeMillis() + + blacklistTracker.get.BLACKLIST_TIMEOUT_MILLIS) + +// check if this task has also failed on other executors on the same host -- if its gone +// over the limit, blacklist it from the entire host +val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet()) +execsWithFailuresOnNode += exec +val failuresOnHost = execsWithFailuresOnNode.toIterator.map { exec => + execToFailures.get(exec).map { failures => --- End diff -- make this a flatMap to avoid the getOrElse below? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77278880 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -765,14 +807,16 @@ private[spark] class TaskSetManager( case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) None +} - case e: TaskEndReason => -logError("Unknown TaskEndReason: " + e) -None +// we might rack up a bunch of fetch-failures in rapid succession, due to a bad node. But --- End diff -- I might keep this comment in the FetchFailed task reason (easier to get out of date here), especially since the countTowardsTaskFailures variable is so clearly named --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77278627 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,34 +610,59 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { - -val pendingTask: Option[Int] = { - // usually this will just take the last pending task, but because of the lazy removal - // from each list, we may need to go deeper in the list. We poll from the end because - // failed tasks are put back at the end of allPendingTasks, so we're more likely to find - // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => -copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) - } - if (indexOffset == -1) { -None - } else { -Some(allPendingTasks(indexOffset)) - } -} + private[scheduler] def abortIfCompletelyBlacklisted( + hostToExecutors: HashMap[String, HashSet[String]]): Unit = { +blacklistTracker.foreach { blacklist => + // because this is called in a loop, with multiple resource offers and locality levels, + // we could end up aborting this taskset multiple times without the !isZombie check + if (!isZombie) { +// take any task that needs to be scheduled, and see if we can find some executor it *could* +// run on +val pendingTask: Option[Int] = { + // usually this will just take the last pending task, but because of the lazy removal + // from each list, we may need to go deeper in the list. We poll from the end because + // failed tasks are put back at the end of allPendingTasks, so we're more likely to find + // an unschedulable task this way. + val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => +copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) + } + if (indexOffset == -1) { +None + } else { +Some(allPendingTasks(indexOffset)) + } +} -// If no executors have registered yet, don't abort the stage, just wait. We probably -// got here because a task set was added before the executors registered. -if (executors.nonEmpty) { - // take any task that needs to be scheduled, and see if we can find some executor it *could* - // run on - pendingTask.foreach { taskId => -if (executors.forall(executorIsBlacklisted(_, taskId))) { - val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")") - val partition = tasks(taskId).partitionId - abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" + -s" has already failed on executors $execs, and no other executors are available.") +// If no executors have registered yet, don't abort the stage, just wait. We probably +// got here because a task set was added before the executors registered. +if (hostToExecutors.nonEmpty) { --- End diff -- why not check this at the beginning, with the isZombie check? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77278534 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,34 +610,59 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { - -val pendingTask: Option[Int] = { - // usually this will just take the last pending task, but because of the lazy removal - // from each list, we may need to go deeper in the list. We poll from the end because - // failed tasks are put back at the end of allPendingTasks, so we're more likely to find - // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => -copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) - } - if (indexOffset == -1) { -None - } else { -Some(allPendingTasks(indexOffset)) - } -} + private[scheduler] def abortIfCompletelyBlacklisted( + hostToExecutors: HashMap[String, HashSet[String]]): Unit = { +blacklistTracker.foreach { blacklist => + // because this is called in a loop, with multiple resource offers and locality levels, --- End diff -- is this still true after Josh's change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77278186 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -50,22 +49,26 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} *task set will be aborted */ private[spark] class TaskSetManager( -sched: TaskSchedulerImpl, +val sched: TaskSchedulerImpl, +val blacklistTracker: Option[BlacklistTracker], val taskSet: TaskSet, val maxTaskFailures: Int, -clock: Clock = new SystemClock()) +val clock: Clock) extends Schedulable with Logging { - val conf = sched.sc.conf + def this( + sched: TaskSchedulerImpl, + taskSet: TaskSet, + maxTaskFailures: Int, + clock: Clock = new SystemClock()) { +this(sched, None, taskSet, maxTaskFailures, clock) --- End diff -- Why a second constructor instead of making blacklist tracker default to none? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77276754 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -51,13 +50,27 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl( +private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, +private val blacklistTracker: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) + def this(sc: SparkContext) = { +this( + sc, + sc.conf.getInt("spark.task.maxFailures", 4), + TaskSchedulerImpl.createBlacklistTracker(sc.conf)) + } + + def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { +this( + sc, + maxTaskFailures, + TaskSchedulerImpl.createBlacklistTracker(sc.conf), --- End diff -- is there a reason for adding these alternate constructors, as opposed to making blacklistTracker the last param, and giving it a default of TaskSChedulerImpl.createBlacklistTradcker(sc.conf)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77276516 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -428,7 +456,7 @@ private[spark] class TaskSchedulerImpl( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, - reason: TaskEndReason): Unit = synchronized { + reason: TaskFailedReason): Unit = synchronized { --- End diff -- Can you stick this change in it's own PR, for tracking purposes, and we can merge it quickly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77276392 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77275984 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77275747 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77275570 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77275187 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77275061 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77274151 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77273920 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an --- End diff -- Can you change this to have the first sentence say "An immutable copy of the set of nodes that are currently blacklisted (i.e., of the keys in nodeIdToBlacklistExpiryTime). Kept..."? (I keep forgetting why this is necessary) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77273256 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77273162 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue --- End diff -- I think private[scheduler] is redundant here since the class is private[scheduler] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77273111 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77250600 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -90,6 +92,54 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") + .booleanConf + .createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") + .intConf + .createWithDefault(1) + + private[spark] val MAX_TASK_ATTEMPTS_PER_NODE = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode") --- End diff -- Why is this one "maxTaskAttempts" whereas the others are "maxFailedTasks"? (apologies if we already discussed this... I know we've had lots of naming discussions) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r77005603 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76685355 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76683789 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76683228 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76682839 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76682765 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. We do not + * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take + * to do so. But it will not grow too large, because as soon as an executor gets too many + * failures, we blacklist the executor and remove its entry here. + */ + private[scheduler] val executorIdToFailureList: HashMap[String, ExecutorFailureList] = +new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a + * shortcut to avoid iterating over all entries in the blacklist when none will have expired. + */ + private[scheduler] var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting mor
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76476924 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. + */ + private val executorIdToFailureList: HashMap[String, ExecutorFailureList] = new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a shortcut to avoid iterating over all + * entries in the blacklist when none will have expired. + */ + private var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more + * executors on that node, and we periodically clean up the list of blacklisted executors. + */ + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() + + def applyBlacklistTimeout(): Unit = { +val now = clock.getTimeMillis() +// quickly check if we've got an
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r76106200 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add + * additional blacklisting of executors and nodes for individual tasks and stages which works in + * concert with the blacklisting here. + * + * The tracker needs to deal with a variety of workloads, eg.: + * + * * bad user code -- this may lead to many task failures, but that should not count against + * individual executors + * * many small stages -- this may prevent a bad executor for having many failures within one + * stage, but still many failures over the entire application + * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit + * blacklisting + * + * See the design doc on SPARK-8425 for a more in-depth discussion. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + BlacklistTracker.validateBlacklistConfs(conf) + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from executorId to information on task failures. Tracks the time of each task failure, + * so that we can avoid blacklisting executors due to failures that are very far apart. + */ + private val executorIdToFailureList: HashMap[String, ExecutorFailureList] = new HashMap() + val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + /** + * An immutable copy of the set of nodes that are currently blacklisted. Kept in an + * AtomicReference to make [[nodeBlacklist()]] thread-safe. + */ + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + /** + * Time when the next blacklist will expire. Used as a shortcut to avoid iterating over all + * entries in the blacklist when none will have expired. + */ + private var nextExpiryTime: Long = Long.MaxValue + /** + * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not* + * remove from this when executors are removed from spark, so we can track when we get multiple + * successive blacklisted executors on one node. Nonetheless, it will not grow too large because + * there cannot be many blacklisted executors on one node, before we stop requesting more + * executors on that node, and we periodically clean up the list of blacklisted executors. + */ + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() + + def applyBlacklistTimeout(): Unit = { +val now = clock.getTimeMillis() +// quickly check if we've got
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75700225 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -204,6 +213,7 @@ case object TaskResultLost extends TaskFailedReason { @DeveloperApi case object TaskKilled extends TaskFailedReason { override def toErrorString: String = "TaskKilled (killed intentionally)" + override val countTowardsTaskFailures: Boolean = false --- End diff -- the switch to a `val` came from an earlier discussion with @kayousterhout ... there was some other confusion, thought maybe changing to a val would make it more clear it is a constant. But I don't think either of feels strongly, the argument to switch to a val was pretty weak. I can change it back --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75700278 --- Diff: docs/configuration.md --- @@ -1178,6 +1178,80 @@ Apart from these, the following properties are also available, and may be useful + spark.blacklist.enabled + +true in cluster mode; +false in local mode + + +If set to "true", prevent Spark from scheduling tasks on executors that have been blacklisted +due to too many task failures. The blacklisting algorithm can be further controlled by the +other "spark.blacklist" configuration options. + + + + spark.blacklist.timeout + 1h + +(Experimental) How long a node or executor is blacklisted for the entire application, before it +is unconditionally removed from the blacklist to attempt running new tasks. + + + + spark.blacklist.task.maxTaskAttemptsPerExecutor + 2 --- End diff -- oops, forgot to update this -- good catch, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user jsoltren commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75508919 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -90,6 +92,54 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") + .booleanConf + .createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") + .intConf + .createWithDefault(1) --- End diff -- I'll file a separate JIRA for this once I think it through a little more. My preference is to see error strings and default values in config files instead of buried in the code. It can be better for readability, and makes it easier to support internationalization. So, for example, instead of createWithDefault(1), do something like createWithDefault(DEFAULT_TASK_ATTEMPTS_PER_EXECUTOR), with some other config file containing a line DEFAULT_TASK_ATTEMPTS_PER_EXECUTOR = 1. But there are bigger fish to fry and I see no reason at all to hold up this change for issues like this. I hope this is more clear than my terse explanation above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75504152 --- Diff: docs/configuration.md --- @@ -1178,6 +1178,80 @@ Apart from these, the following properties are also available, and may be useful + spark.blacklist.enabled + +true in cluster mode; +false in local mode + + +If set to "true", prevent Spark from scheduling tasks on executors that have been blacklisted +due to too many task failures. The blacklisting algorithm can be further controlled by the +other "spark.blacklist" configuration options. + + + + spark.blacklist.timeout + 1h + +(Experimental) How long a node or executor is blacklisted for the entire application, before it +is unconditionally removed from the blacklist to attempt running new tasks. + + + + spark.blacklist.task.maxTaskAttemptsPerExecutor + 2 --- End diff -- default is actually 1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75502930 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -222,7 +232,7 @@ case class TaskCommitDenied( * towards failing the stage. This is intended to prevent spurious stage failures in cases * where many speculative tasks are launched and denied to commit. */ - override def countTowardsTaskFailures: Boolean = false + override val countTowardsTaskFailures: Boolean = false --- End diff -- def? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75502668 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -204,6 +213,7 @@ case object TaskResultLost extends TaskFailedReason { @DeveloperApi case object TaskKilled extends TaskFailedReason { override def toErrorString: String = "TaskKilled (killed intentionally)" + override val countTowardsTaskFailures: Boolean = false --- End diff -- why is this a val and not def? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75501468 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -90,6 +92,54 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") + .booleanConf + .createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") + .intConf + .createWithDefault(1) --- End diff -- Note sure I follow your nit here, this is how the ConfigEntries work, you define the config, type, default, etc. if you have issues with it in general it should be a separate jira against the Config stuff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user jsoltren commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75392592 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -809,32 +821,65 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Kill other task attempts when one attempt belonging to the same task succeeds") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) -val taskSet = FakeTask.createTaskSet(4) +val taskSet = FakeTask.createTaskSet(5) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.6") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } // Offer resources for 4 tasks to start +val tasks = new ArrayBuffer[TaskDescription]() for ((k, v) <- List( "exec1" -> "host1", "exec1" -> "host1", +"exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { val taskOption = manager.resourceOffer(k, v, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) + tasks += task } -assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) -// Complete the 3 tasks and leave 1 task in running +assert(sched.startedTasks.toSet === (0 until 5).toSet) +// Complete 3 tasks and leave 2 task in running for (id <- Set(0, 1, 2)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) assert(sched.endedTasks(id) === Success) } +def runningTaskForIndex(index: Int): TaskDescription = { + val t = tasks.find { task => task.index == index && !sched.endedTasks.contains(task.taskId) } + t match { +case Some(x) => x +case None => + throw new RuntimeException(s"couldn't find index $index in " + +s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" + +s" ${sched.endedTasks.keys}") + } +} + +// have each of the running tasks fail 3 times (not enough to abort the stage) +(3 until 6).foreach { attempt => --- End diff -- I worry about another hardcoded policy here. Fine for now, perhaps something to revisit later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user jsoltren commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r75369742 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -90,6 +92,54 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") + .booleanConf + .createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = +ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") + .intConf + .createWithDefault(1) --- End diff -- Nit: I worry about mixing mechanism (defining the config attribute) and policy (setting a default value) here. Granted, this change follows the existing pattern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74671218 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * specific (executor, task) pairs within a stage, blacklisting entire executors and nodes for a + * stage, and blacklisting executors and nodes across an entire application (with a periodic + * expiry). + * + * The tracker needs to deal with a variety of workloads, eg.: bad user code, which may lead to many + * task failures, but that should not count against individual executors; many small stages, which + * may prevent a bad executor for having many failures within one stage, but still many failures + * over the entire application; "flaky" executors, that don't fail every task, but are still + * faulty; etc. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val EXECUTOR_RECOVERY_MILLIS = BlacklistTracker.getBlacklistExpiryTime(conf) + + // a count of failed tasks for each executor. Only counts failures after tasksets complete + // successfully + private val executorIdToFailureCount: HashMap[String, Int] = new HashMap() + private val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + private val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + private var nextExpiryTime: Long = Long.MaxValue + // for blacklisted executors, the node it is on. We do *not* remove from this when executors are + // removed from spark, so we can track when we get multiple successive blacklisted executors on + // one node. + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() + + def expireExecutorsInBlacklist(): Unit = { +val now = clock.getTimeMillis() +// quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work +if (now > nextExpiryTime) { + val execsToClear = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys + if (execsToClear.nonEmpty) { +logInfo(s"Removing executors $execsToClear from blacklist during periodic recovery") +execsToClear.foreach { exec => + val status = executorIdToBlacklistStatus.remove(exec).get + val failedExecsOnNode = nodeToFailedExecs(status.node) + failedExecsOnNode.remove(exec) + if (failedExecsOnNode.isEmpty) { +nodeToFailedExecs.remove(status.node) + } +} + } + if (executorIdToBlacklistStatus.nonEmpty) { +nextExpiryTime = executorIdToBlacklistStatus.map{_._2.expiryTime}.min + } else { +nextExpiryTime = Long.MaxValue + } + val nodesToClear = nodeIdToBlacklistExpiryTime.filter(_._2 < now)
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74670495 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -281,15 +323,216 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("scheduled tasks obey task and stage blacklists") { +val blacklist = mock[BlacklistTracker] +taskScheduler = setupSchedulerWithMockTsm(blacklist) +(0 to 2).foreach { stageId => + val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) +} + +val offers = Seq( + new WorkerOffer("executor0", "host0", 1), + new WorkerOffer("executor1", "host1", 1), + new WorkerOffer("executor2", "host1", 1), + new WorkerOffer("executor3", "host2", 10) +) + +// setup our mock blacklist: +// stage 0 is blacklisted on node "host1" +// stage 1 is blacklisted on executor "executor3" +// stage 0, part 0 is blacklisted on executor 0 +// (later stubs take precedence over earlier ones) +when(blacklist.isNodeBlacklisted(anyString())).thenReturn(false) +when(blacklist.isExecutorBlacklisted(anyString())).thenReturn(false) +// setup some defaults, then override them with particulars +stageToMockTsm.values.foreach { tsm => + when(tsm.isNodeBlacklistedForTaskSet(anyString())).thenReturn(false) + when(tsm.isExecutorBlacklistedForTaskSet(anyString())).thenReturn(false) + when(tsm.isExecutorBlacklistedForTask(anyString(), anyInt())).thenReturn(false) --- End diff -- It is used, because the TSM is partially being tested as well here. Its not a total mock, its a real tsm with just the blacklisting methods replaced. with some of the other changes you have suggested, this is actually mostly just checking the TSM, so I could move these tests over there if it would help. I am trying to have some tests which aren't super narrow unit tests, that test a slightly larger chunk of scheduling to ensure the components interact correctly, but maybe in this case it would be clearer in the other suite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74669603 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * specific (executor, task) pairs within a stage, blacklisting entire executors and nodes for a + * stage, and blacklisting executors and nodes across an entire application (with a periodic + * expiry). + * + * The tracker needs to deal with a variety of workloads, eg.: bad user code, which may lead to many + * task failures, but that should not count against individual executors; many small stages, which + * may prevent a bad executor for having many failures within one stage, but still many failures + * over the entire application; "flaky" executors, that don't fail every task, but are still + * faulty; etc. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val EXECUTOR_RECOVERY_MILLIS = BlacklistTracker.getBlacklistExpiryTime(conf) + + // a count of failed tasks for each executor. Only counts failures after tasksets complete + // successfully + private val executorIdToFailureCount: HashMap[String, Int] = new HashMap() + private val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + private val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + private var nextExpiryTime: Long = Long.MaxValue + // for blacklisted executors, the node it is on. We do *not* remove from this when executors are + // removed from spark, so we can track when we get multiple successive blacklisted executors on + // one node. + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() + + def expireExecutorsInBlacklist(): Unit = { +val now = clock.getTimeMillis() +// quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work +if (now > nextExpiryTime) { + val execsToClear = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys + if (execsToClear.nonEmpty) { +logInfo(s"Removing executors $execsToClear from blacklist during periodic recovery") +execsToClear.foreach { exec => + val status = executorIdToBlacklistStatus.remove(exec).get + val failedExecsOnNode = nodeToFailedExecs(status.node) + failedExecsOnNode.remove(exec) + if (failedExecsOnNode.isEmpty) { +nodeToFailedExecs.remove(status.node) + } +} + } + if (executorIdToBlacklistStatus.nonEmpty) { +nextExpiryTime = executorIdToBlacklistStatus.map{_._2.expiryTime}.min + } else { +nextExpiryTime = Long.MaxValue + } + val nodesToClear = nodeIdToBlacklistExpiryTime.filter(_._2 < now).keys
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74339234 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -52,13 +51,23 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl( +private[spark] class TaskSchedulerImpl private[scheduler]( --- End diff -- I think I was thinking just make it completely private (but leave it as a constructor -- the reason you said seems important). Sent from my iPhone > On Aug 10, 2016, at 2:39 PM, Imran Rashid wrote: > > In core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: > > > @@ -52,13 +51,23 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} > > * acquire a lock on us, so we need to make sure that we don't try to lock the backend while > > * we are holding a lock on ourselves. > > */ > > -private[spark] class TaskSchedulerImpl( > > +private[spark] class TaskSchedulerImpl private[scheduler]( > just to make sure I understand -- do you want to eliminate having it be a parameter for the constructor to TaskSchedulerImpl completely, so that it is always created from the conf? I think that would prevent proper testing, since I couldn't eg. pass in a mock. > > Or keep the constructor, but just make blacklistTracker completely private, (not private[scheduler])? That should be straightforward, all the tests which directly create a TaskSetManager would just use None for the blacklist. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub, or mute the thread. > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74335937 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -52,13 +51,23 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl( +private[spark] class TaskSchedulerImpl private[scheduler]( --- End diff -- just to make sure I understand -- do you want to eliminate having it be a parameter for the constructor to `TaskSchedulerImpl` completely, so that it is always created from the conf? I think that would prevent proper testing, since I couldn't eg. pass in a mock. Or keep the constructor, but just make `blacklistTracker` completely private, (not `private[scheduler]`)? That should be straightforward, all the tests which directly create a TaskSetManager would just use `None` for the blacklist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74333404 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -770,9 +794,19 @@ private[spark] class TaskSetManager( logError("Unknown TaskEndReason: " + e) None } -// always add to failed executors -failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). - put(info.executorId, clock.getTimeMillis()) + +// we might rack up a bunch of fetch-failures in rapid succession, due to a bad node. But +// that bad node will get handled separately by spark's stage-failure handling mechanism. It +// shouldn't penalize *this* executor at all, so don't count it as a task-failure as far as +// the blacklist is concerned. +val countTowardsTaskFailures = reason match { + case fail: TaskFailedReason => fail.countTowardsTaskFailures --- End diff -- Cool thanks for fixing this!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74333246 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -800,6 +832,78 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } + private[scheduler] def updateBlacklistForFailedTask( + host: String, + exec: String, + index: Int): Unit = { +val failureStatus = execToFailures.getOrElseUpdate(exec, new FailureStatus(host)) +failureStatus.totalFailures += 1 +failureStatus.tasksWithFailures += index + +// check if this task has also failed on other executors on the same host, and if so, blacklist +// this task from the host +val execsWithFailuresOnNode = nodesToExecsWithFailures.getOrElseUpdate(host, new HashSet()) +execsWithFailuresOnNode += exec +val failuresOnHost = (for { + exec <- execsWithFailuresOnNode.toIterator + failures <- execToFailures.get(exec) +} yield { + if (failures.tasksWithFailures.contains(index)) 1 else 0 +}).sum --- End diff -- I use `for / yield` to avoid lots of nesting (`execToFailures.get(_)` returns an Option that you need to deal with), but also don't care much --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74332993 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -770,9 +794,19 @@ private[spark] class TaskSetManager( logError("Unknown TaskEndReason: " + e) None } -// always add to failed executors -failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). - put(info.executorId, clock.getTimeMillis()) + +// we might rack up a bunch of fetch-failures in rapid succession, due to a bad node. But +// that bad node will get handled separately by spark's stage-failure handling mechanism. It +// shouldn't penalize *this* executor at all, so don't count it as a task-failure as far as +// the blacklist is concerned. +val countTowardsTaskFailures = reason match { + case fail: TaskFailedReason => fail.countTowardsTaskFailures --- End diff -- yeah, agree. There is also the code on the old line 788 which made it seem like it was possible for it to be something else, which is what this line is replacing. I wondered about doing this, but decided for the smaller change, though I'm fine either way. This means we effectively move the cast to [`TaskResultGetter.enqueueFailedTask`](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L128). I will make that change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74327484 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -52,13 +51,23 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl( +private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, +private[scheduler] val blacklistTracker: Option[BlacklistTracker], +private val clock: Clock = new SystemClock, isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) + def this(sc: SparkContext) = { +this(sc, sc.conf.getInt("spark.task.maxFailures", 4), + TaskSchedulerImpl.createBlacklistTracker(sc.conf)) --- End diff -- I was thinking of this part of the Scale style guide: http://docs.scala-lang.org/style/indentation.html#line_wrapping I know Spark doesn't follow this super consistently, but I do think it helps readability (esp. in cases like this where there are some long-ish individual parameters, so it's hard to scan the list), so good to do IMO where we remember. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r74326330 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -52,13 +51,23 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl( +private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, +private[scheduler] val blacklistTracker: Option[BlacklistTracker], +private val clock: Clock = new SystemClock, isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) + def this(sc: SparkContext) = { +this(sc, sc.conf.getInt("spark.task.maxFailures", 4), + TaskSchedulerImpl.createBlacklistTracker(sc.conf)) --- End diff -- I don't mind changing this, but I didn't think that was our style convention. I thought that was only for declaring the parameters of methods -- not for their call sites. Maybe its different for invocations of `this()`, but I even see counter-examples to that, eg. : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskEndReason.scala#L133 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala#L47 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r72571864 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * specific (executor, task) pairs within a stage, blacklisting entire executors and nodes for a + * stage, and blacklisting executors and nodes across an entire application (with a periodic + * expiry). + * + * The tracker needs to deal with a variety of workloads, eg.: bad user code, which may lead to many + * task failures, but that should not count against individual executors; many small stages, which + * may prevent a bad executor for having many failures within one stage, but still many failures + * over the entire application; "flaky" executors, that don't fail every task, but are still + * faulty; etc. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val EXECUTOR_RECOVERY_MILLIS = BlacklistTracker.getBlacklistExpiryTime(conf) + + // a count of failed tasks for each executor. Only counts failures after tasksets complete + // successfully + private val executorIdToFailureCount: HashMap[String, Int] = new HashMap() + private val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + private val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + private var nextExpiryTime: Long = Long.MaxValue + // for blacklisted executors, the node it is on. We do *not* remove from this when executors are + // removed from spark, so we can track when we get multiple successive blacklisted executors on + // one node. + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() + + def expireExecutorsInBlacklist(): Unit = { +val now = clock.getTimeMillis() +// quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work +if (now > nextExpiryTime) { + val execsToClear = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys + if (execsToClear.nonEmpty) { +logInfo(s"Removing executors $execsToClear from blacklist during periodic recovery") +execsToClear.foreach { exec => + val status = executorIdToBlacklistStatus.remove(exec).get + val failedExecsOnNode = nodeToFailedExecs(status.node) + failedExecsOnNode.remove(exec) + if (failedExecsOnNode.isEmpty) { +nodeToFailedExecs.remove(status.node) + } +} + } + if (executorIdToBlacklistStatus.nonEmpty) { +nextExpiryTime = executorIdToBlacklistStatus.map{_._2.expiryTime}.min + } else { +nextExpiryTime = Long.MaxValue + } + val nodesToClear = nodeIdToBlacklistExpiryTime.filter(_._2 < now)
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r72571189 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -97,6 +97,49 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") +.booleanConf +.createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_NODE = +ConfigBuilder("spark.blacklist.maxTaskAttemptsPerNode") +.intConf +.createWithDefault(2) + + private[spark] val MAX_FAILURES_PER_EXEC = +ConfigBuilder("spark.blacklist.maxFailedTasksPerExecutor") +.intConf +.createWithDefault(2) + + private[spark] val MAX_FAILURES_PER_EXEC_STAGE = +ConfigBuilder("spark.blacklist.maxFailedTasksPerExecutorStage") --- End diff -- Agree re:stage -- seems more accessible to users --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r72571257 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * specific (executor, task) pairs within a stage, blacklisting entire executors and nodes for a + * stage, and blacklisting executors and nodes across an entire application (with a periodic + * expiry). + * + * The tracker needs to deal with a variety of workloads, eg.: bad user code, which may lead to many + * task failures, but that should not count against individual executors; many small stages, which + * may prevent a bad executor for having many failures within one stage, but still many failures + * over the entire application; "flaky" executors, that don't fail every task, but are still + * faulty; etc. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val EXECUTOR_RECOVERY_MILLIS = BlacklistTracker.getBlacklistExpiryTime(conf) + + // a count of failed tasks for each executor. Only counts failures after tasksets complete + // successfully + private val executorIdToFailureCount: HashMap[String, Int] = new HashMap() + private val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + private val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + private var nextExpiryTime: Long = Long.MaxValue + // for blacklisted executors, the node it is on. We do *not* remove from this when executors are + // removed from spark, so we can track when we get multiple successive blacklisted executors on + // one node. + val nodeToFailedExecs: HashMap[String, HashSet[String]] = new HashMap() + + def expireExecutorsInBlacklist(): Unit = { +val now = clock.getTimeMillis() +// quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work +if (now > nextExpiryTime) { + val execsToClear = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys + if (execsToClear.nonEmpty) { +logInfo(s"Removing executors $execsToClear from blacklist during periodic recovery") +execsToClear.foreach { exec => + val status = executorIdToBlacklistStatus.remove(exec).get + val failedExecsOnNode = nodeToFailedExecs(status.node) + failedExecsOnNode.remove(exec) + if (failedExecsOnNode.isEmpty) { +nodeToFailedExecs.remove(status.node) + } +} + } + if (executorIdToBlacklistStatus.nonEmpty) { +nextExpiryTime = executorIdToBlacklistStatus.map{_._2.expiryTime}.min + } else { +nextExpiryTime = Long.MaxValue + } + val nodesToClear = nodeIdToBlacklistExpiryTime.filter(_._2 < now).keys
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r72571040 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -97,6 +97,49 @@ package object config { .toSequence .createWithDefault(Nil) + // Blacklist confs + private[spark] val BLACKLIST_ENABLED = +ConfigBuilder("spark.blacklist.enabled") +.booleanConf +.createOptional + + private[spark] val MAX_TASK_ATTEMPTS_PER_NODE = +ConfigBuilder("spark.blacklist.maxTaskAttemptsPerNode") +.intConf +.createWithDefault(2) + + private[spark] val MAX_FAILURES_PER_EXEC = +ConfigBuilder("spark.blacklist.maxFailedTasksPerExecutor") +.intConf +.createWithDefault(2) + + private[spark] val MAX_FAILURES_PER_EXEC_STAGE = +ConfigBuilder("spark.blacklist.maxFailedTasksPerExecutorStage") --- End diff -- yes, agreed. I like both versions better than the current names, no strong preference. Do you have any opinion on my question of using "stage" vs. "taskset" from the design doc? I lean towards sticking w/ stage despite its inaccuracy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r72541736 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,7 +599,9 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { + private[scheduler] def abortIfCompletelyBlacklisted( + executorsByHost: HashMap[String, HashSet[String]], + blacklist: BlacklistTracker): Unit = { --- End diff -- looks like you don't need to pass this in anymore, since it's now part of the class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14079: [SPARK-8425][CORE] New Blacklist Mechanism
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/14079#discussion_r72541451 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting + * specific (executor, task) pairs within a stage, blacklisting entire executors and nodes for a + * stage, and blacklisting executors and nodes across an entire application (with a periodic + * expiry). + * + * The tracker needs to deal with a variety of workloads, eg.: bad user code, which may lead to many + * task failures, but that should not count against individual executors; many small stages, which + * may prevent a bad executor for having many failures within one stage, but still many failures + * over the entire application; "flaky" executors, that don't fail every task, but are still + * faulty; etc. + * + * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is + * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The + * one exception is [[nodeBlacklist()]], which can be called without holding a lock. + */ +private[scheduler] class BlacklistTracker ( +conf: SparkConf, +clock: Clock = new SystemClock()) extends Logging { + + private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) + private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) + val EXECUTOR_RECOVERY_MILLIS = BlacklistTracker.getBlacklistExpiryTime(conf) + + // a count of failed tasks for each executor. Only counts failures after tasksets complete + // successfully + private val executorIdToFailureCount: HashMap[String, Int] = new HashMap() + private val executorIdToBlacklistStatus: HashMap[String, BlacklistedExecutor] = new HashMap() + private val nodeIdToBlacklistExpiryTime: HashMap[String, Long] = new HashMap() + private val _nodeBlacklist: AtomicReference[Set[String]] = new AtomicReference(Set()) + private var nextExpiryTime: Long = Long.MaxValue + // for blacklisted executors, the node it is on. We do *not* remove from this when executors are + // removed from spark, so we can track when we get multiple successive blacklisted executors on + // one node. --- End diff -- Also can you add a sentence here explaining why you don't expect this to grow to be infinitely large for a long-running app? (assume this is because there will be a max # of executors on each node in this map before the node as a whole will be blacklisted?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org