vanzin commented on a change in pull request #24817: [SPARK-27963][core] Allow 
dynamic allocation without a shuffle service.
URL: https://github.com/apache/spark/pull/24817#discussion_r298395270
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
 ##########
 @@ -259,8 +262,84 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors().toSet === Set("2"))
   }
 
+  test("shuffle block tracking") {
+    // Mock the listener bus *only* for the functionality needed by the 
shuffle tracking code.
+    // Any other event sent through the mock bus will fail.
+    val bus = mock(classOf[LiveListenerBus])
+    doAnswer { invocation =>
+      
monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent])
+    }.when(bus).post(any())
+
+    conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, 
true).set(SHUFFLE_SERVICE_ENABLED, false)
+    monitor = new ExecutorMonitor(conf, client, bus, clock)
+
+    // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
+    val stage1 = stageInfo(1, shuffleId = 0)
+    val stage2 = stageInfo(2)
+
+    val stage3 = stageInfo(3, shuffleId = 1)
+    val stage4 = stageInfo(4)
+
+    val stage5 = stageInfo(5, shuffleId = 1)
+    val stage6 = stageInfo(6)
+
+    // Start jobs 1 and 2. Finish a task on each, but don't finish the jobs. 
This should prevent the
+    // executor from going idle since there are active shuffles.
+    monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), 
Seq(stage1, stage2)))
+    monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), 
Seq(stage3, stage4)))
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    // First a failed task, to make sure it does not count.
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", TaskResultLost, 
taskInfo("1", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 
1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("1", 
1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    // Finish the jobs, now the executor should be idle, but with the shuffle 
timeout, since the
+    // shuffles are not active.
+    monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), 
JobSucceeded))
+    assert(!monitor.isExecutorIdle("1"))
+
+    monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), 
JobSucceeded))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+    // Start job 3. Since it shares a shuffle with job 2, the executor should 
not be considered
+    // idle anymore, even if no tasks are run.
+    monitor.onJobStart(SparkListenerJobStart(3, clock.getTimeMillis(), 
Seq(stage5, stage6)))
+    assert(!monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(shuffleDeadline).isEmpty)
+
+    monitor.onJobEnd(SparkListenerJobEnd(3, clock.getTimeMillis(), 
JobSucceeded))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+    // Clean up the shuffles, executor now should now time out at the idle 
deadline.
 
 Review comment:
   That's job 3, isn't it? (L318)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to