wangshengjie123 commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r952102558


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec1".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    
executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    
executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+
+    // assert exec1 and exec2 is not idle
+    val method = classOf[ExecutorMonitor].getDeclaredMethod("isExecutorIdle",
+      classOf[String])

Review Comment:
   I change the methods: `isExecutorIdle` and `ensureExecutorIsTracked` in 
`ExecutorMonitor`, romoved all the reflection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to