mridulm commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r951813638


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2596,28 +2605,39 @@ class TaskSetManagerSuite
       .getDeclaredMethod("ensureExecutorIsTracked",
         classOf[String], classOf[Int])
     ensureExecutorIsTracked.setAccessible(true)
-    val executorTracker = ensureExecutorIsTracked.invoke(executorMonitor,
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
       "exec1".asInstanceOf[AnyRef],
       ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
-    executorTracker.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    
executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    
executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
 
-    // assert exec1 is not idle
+    // assert exec1 and exec2 is not idle

Review Comment:
   super nit: `is not` -> `are not`



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec1".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    
executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    
executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+
+    // assert exec1 and exec2 is not idle
+    val method = classOf[ExecutorMonitor].getDeclaredMethod("isExecutorIdle",
+      classOf[String])

Review Comment:
   Relax the visibility of method to `scheduler` package and avoid reflection ?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)

Review Comment:
   Instead, do we want to call `onExecutorAdded` ? That is the behavior we want 
to trigger right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to