Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568358423



##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##########
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+
+    // verify we decommission when configured
+    conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+    conf.set(config.DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+    conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+    healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+    // Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+    // application.
+    val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+    (0 until 4).foreach { partition =>
+      taskSetExclude2.updateExcludedForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+    val msg1 =
+      "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+
+    verify(allocationClientMock).decommissionExecutor(
+      "1", ExecutorDecommissionInfo(msg1), false)
+
+    val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
+    // Fail 4 tasks in one task set on executor 2, so that executor gets 
excluded for the whole
+    // application.  Since that's the second executor that is excluded on the 
same node, we also
+    // exclude that node.
+    (0 until 4).foreach { partition =>
+      taskSetExclude3.updateExcludedForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude3.execToFailures)
+
+    val msg2 =
+      "Killing excluded executor id 2 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+    verify(allocationClientMock).decommissionExecutor(
+      "2", ExecutorDecommissionInfo(msg2), false, false)
+    verify(allocationClientMock).decommissionExecutorsOnHost(
+      "hostA")

Review comment:
       nit: turn into one line?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to