vanzin commented on a change in pull request #24817: [SPARK-27963][core] Allow
dynamic allocation without a shuffle service.
URL: https://github.com/apache/spark/pull/24817#discussion_r298395270
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
##########
@@ -259,8 +262,84 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors().toSet === Set("2"))
}
+ test("shuffle block tracking") {
+ // Mock the listener bus *only* for the functionality needed by the
shuffle tracking code.
+ // Any other event sent through the mock bus will fail.
+ val bus = mock(classOf[LiveListenerBus])
+ doAnswer { invocation =>
+
monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent])
+ }.when(bus).post(any())
+
+ conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING,
true).set(SHUFFLE_SERVICE_ENABLED, false)
+ monitor = new ExecutorMonitor(conf, client, bus, clock)
+
+ // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
+ val stage1 = stageInfo(1, shuffleId = 0)
+ val stage2 = stageInfo(2)
+
+ val stage3 = stageInfo(3, shuffleId = 1)
+ val stage4 = stageInfo(4)
+
+ val stage5 = stageInfo(5, shuffleId = 1)
+ val stage6 = stageInfo(6)
+
+ // Start jobs 1 and 2. Finish a task on each, but don't finish the jobs.
This should prevent the
+ // executor from going idle since there are active shuffles.
+ monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(),
Seq(stage1, stage2)))
+ monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(),
Seq(stage3, stage4)))
+
+ monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(),
"1", null))
+ assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+ // First a failed task, to make sure it does not count.
+ monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+ monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", TaskResultLost,
taskInfo("1", 1), null))
+ assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+ monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+ monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1",
1), null))
+ assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+ monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("1", 1)))
+ monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("1",
1), null))
+ assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+ // Finish the jobs, now the executor should be idle, but with the shuffle
timeout, since the
+ // shuffles are not active.
+ monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(),
JobSucceeded))
+ assert(!monitor.isExecutorIdle("1"))
+
+ monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(),
JobSucceeded))
+ assert(monitor.isExecutorIdle("1"))
+ assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+ assert(monitor.timedOutExecutors(storageDeadline).isEmpty)
+ assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+ // Start job 3. Since it shares a shuffle with job 2, the executor should
not be considered
+ // idle anymore, even if no tasks are run.
+ monitor.onJobStart(SparkListenerJobStart(3, clock.getTimeMillis(),
Seq(stage5, stage6)))
+ assert(!monitor.isExecutorIdle("1"))
+ assert(monitor.timedOutExecutors(shuffleDeadline).isEmpty)
+
+ monitor.onJobEnd(SparkListenerJobEnd(3, clock.getTimeMillis(),
JobSucceeded))
+ assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+ assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+ // Clean up the shuffles, executor now should now time out at the idle
deadline.
Review comment:
That's job 3, isn't it? (L318)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]