Github user lirui-apache commented on a diff in the pull request:
https://github.com/apache/spark/pull/21486#discussion_r192614874
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
---
@@ -207,6 +210,55 @@ class HeartbeatReceiverSuite
assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1,
executorId2))
}
+ test("expired host should not be offered again") {
+ scheduler = spy(new TaskSchedulerImpl(sc))
+ scheduler.setDAGScheduler(sc.dagScheduler)
+ when(sc.taskScheduler).thenReturn(scheduler)
+ doReturn(true).when(scheduler).executorHeartbeatReceived(any(), any(),
any())
+
+ // Set up a fake backend and cluster manager to simulate killing
executors
+ val rpcEnv = sc.env.rpcEnv
+ val fakeClusterManager = new FakeClusterManager(rpcEnv)
+ val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm",
fakeClusterManager)
+ val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv,
fakeClusterManagerRef)
+ when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
+
+ fakeSchedulerBackend.start()
+ val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
+ val dummyExecutorEndpointRef1 =
rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
+ fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4",
2, Map.empty))
+ heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
+ addExecutorAndVerify(executorId1)
+ triggerHeartbeat(executorId1, executorShouldReregister = false)
+
+ scheduler.initialize(fakeSchedulerBackend)
+ sc.requestTotalExecutors(0, 0, Map.empty)
--- End diff --
Just thought we don't need a replacement executor for this test case. Will
update total num to 1 to be closer to real use case.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]