squito commented on a change in pull request #24817: [SPARK-27963][core] Allow 
dynamic allocation without a shuffle service.
URL: https://github.com/apache/spark/pull/24817#discussion_r298274054
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
 ##########
 @@ -259,8 +262,84 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors().toSet === Set("2"))
   }
 
+  test("shuffle block tracking") {
+    // Mock the listener bus *only* for the functionality needed by the 
shuffle tracking code.
+    // Any other event sent through the mock bus will fail.
+    val bus = mock(classOf[LiveListenerBus])
+    doAnswer { invocation =>
+      
monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent])
+    }.when(bus).post(any())
+
+    conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, 
true).set(SHUFFLE_SERVICE_ENABLED, false)
+    monitor = new ExecutorMonitor(conf, client, bus, clock)
+
+    // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
+    val stage1 = stageInfo(1, shuffleId = 0)
+    val stage2 = stageInfo(2)
+
+    val stage3 = stageInfo(3, shuffleId = 1)
+    val stage4 = stageInfo(4)
+
+    val stage5 = stageInfo(5, shuffleId = 1)
+    val stage6 = stageInfo(6)
+
+    // Start jobs 1 and 2. Finish a task on each, but don't finish the jobs. 
This should prevent the
+    // executor from going idle since there are active shuffles.
+    monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), 
Seq(stage1, stage2)))
+    monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), 
Seq(stage3, stage4)))
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    // First a failed task, to make sure it does not count.
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", TaskResultLost, 
taskInfo("1", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 
1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("1", 
1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    // Finish the jobs, now the executor should be idle, but with the shuffle 
timeout, since the
+    // shuffles are not active.
+    monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), 
JobSucceeded))
+    assert(!monitor.isExecutorIdle("1"))
+
+    monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), 
JobSucceeded))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+    // Start job 3. Since it shares a shuffle with job 2, the executor should 
not be considered
+    // idle anymore, even if no tasks are run.
+    monitor.onJobStart(SparkListenerJobStart(3, clock.getTimeMillis(), 
Seq(stage5, stage6)))
+    assert(!monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(shuffleDeadline).isEmpty)
+
+    monitor.onJobEnd(SparkListenerJobEnd(3, clock.getTimeMillis(), 
JobSucceeded))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+    // Clean up the shuffles, executor now should now time out at the idle 
deadline.
 
 Review comment:
   you should also test another job after this, which references the same 
shuffles from the earlier jobs, to make sure things get properly moved out of 
being idle.
   
   Also one thing to keep an eye on is that if you re-use old shuffles, the 
DAGScheduler creates new stages, which then get skipped, but should keep the 
consistent shuffle ids.  I'm thinking of simple pipelines where they get broken 
with a count or something, but logically its all one thing eg.
   
   ```scala
   val cachedAfterFirstShuffle = someRdd.reduceByKey{ ...}.cache()
   cachedAfterFirstShuffle.count() // job 1
   cachedAfterFirstShuffle.map ... // job 2, etc.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to