[GitHub] spark issue #21486: [SPARK-24387][Core] Heartbeat-timeout executor is added ...
Github user lirui-apache commented on the issue: https://github.com/apache/spark/pull/21486 cc @vanzin @andrewor14 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21486: [SPARK-24387][Core] Heartbeat-timeout executor is added ...
Github user lirui-apache commented on the issue: https://github.com/apache/spark/pull/21486 Hi guys, any further comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user lirui-apache commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r193044306 --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala --- @@ -207,6 +210,54 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("expired executor should not be offered again (SPARK-24387)") { +scheduler = spy(new TaskSchedulerImpl(sc)) --- End diff -- Because I need a functioning scheduler rather than just a mock. And need to stub the executorHeartbeatReceived method. So I use a spy here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user lirui-apache commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r192614888 --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala --- @@ -197,14 +197,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) // Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) --- End diff -- Yes: ``` private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true, force = true).nonEmpty case _ => logWarning("Killing executors is not supported by current scheduler.") false } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user lirui-apache commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r192614874 --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala --- @@ -207,6 +210,55 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("expired host should not be offered again") { +scheduler = spy(new TaskSchedulerImpl(sc)) +scheduler.setDAGScheduler(sc.dagScheduler) +when(sc.taskScheduler).thenReturn(scheduler) +doReturn(true).when(scheduler).executorHeartbeatReceived(any(), any(), any()) + +// Set up a fake backend and cluster manager to simulate killing executors +val rpcEnv = sc.env.rpcEnv +val fakeClusterManager = new FakeClusterManager(rpcEnv) +val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager) +val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef) +when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend) + +fakeSchedulerBackend.start() +val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv) +val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) +fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 2, Map.empty)) +heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) +addExecutorAndVerify(executorId1) +triggerHeartbeat(executorId1, executorShouldReregister = false) + +scheduler.initialize(fakeSchedulerBackend) +sc.requestTotalExecutors(0, 0, Map.empty) --- End diff -- Just thought we don't need a replacement executor for this test case. Will update total num to 1 to be closer to real use case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user lirui-apache commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r192614878 --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala --- @@ -207,6 +210,55 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("expired host should not be offered again") { --- End diff -- Should be `executor`. Thanks for catching, will update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
GitHub user lirui-apache opened a pull request: https://github.com/apache/spark/pull/21486 [SPARK-24387][Core] Heartbeat-timeout executor is added back and used again ## What changes were proposed in this pull request? When an executor's heartbeat is lost, we call scheduler.executorLost before we tell the backend to kill the executor. TaskSchedulerImpl asks the backend to revive offers in executorLost. If this is the only executor, it's possible the backend will offer it again to TaskSchedulerImpl, and the retried task is scheduled to this executor. This patch proposes to call scheduler.executorLost after the executor is killed. At this point, the executor has been marked as pending-to-remove and won't be offered again. ## How was this patch tested? Added a new test case in HeartbeatReceiverSuite. W/o the fix this test case fails. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lirui-apache/spark SPARK-24387 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21486.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21486 commit 189f2696dab47a23b3f2a48a313a72dc4ec77c80 Author: Rui Li Date: 2018-06-02T08:25:10Z Call executorLost after the executor is killed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org