yadavay-amzn commented on code in PR #56628:
URL: https://github.com/apache/spark/pull/56628#discussion_r3472121136


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -571,10 +571,18 @@ private[spark] class Executor(
         try {
           logError(log"Executor launch task ${MDC(TASK_NAME, 
taskDescription.name)} failed," +
             log" reason: ${MDC(REASON, t.getMessage)}")
+          // SPARK-57465: If the thread pool rejected the task because the 
executor is shutting
+          // down, report a non-counting failure so the task can be 
rescheduled elsewhere.
+          val reason: TaskFailedReason = t match {
+            case _: RejectedExecutionException if executorShutdown.get() =>

Review Comment:
   Added two ExecutorSuite tests using the SPARK-54087 reflection pattern (mock 
`threadPool.execute` to throw, capture + deserialize the statusUpdate reason): 
(a) `executorShutdown` set -> `ExecutorShutdownFailure` 
(`countTowardsTaskFailures=false`), (b) flag unset -> `ExceptionFailure`. They 
drive `launchTask` directly, so the actual mapping is now covered (shared setup 
extracted into a helper).



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2921,6 +2921,82 @@ class TaskSetManagerSuite
         s"\nCaptured logs:\n${logs.mkString("\n")}")
   }
 
+  // SPARK-57465: Demonstrates that RejectedExecutionException (wrapped as 
ExceptionFailure)
+  // counts toward task failure budget, consuming retries for a non-task-fault.
+  test("SPARK-57465: RejectedExecutionException counts toward task failures 
(bug repro)") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = FakeTask.createTaskSet(1)
+    val clock = new ManualClock(1) // start at 1 so markFinished(time > 0) 
passes
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+
+    // Simulate the failure that Executor.launchTask sends when 
threadPool.execute throws
+    // RejectedExecutionException: it wraps as ExceptionFailure (which has
+    // countTowardsTaskFailures = true by default).
+    val rejectedExc = new java.util.concurrent.RejectedExecutionException(
+      "Task org.apache.spark.executor.Executor$TaskRunner rejected from " +
+        "java.util.concurrent.ThreadPoolExecutor [Shutting down]")
+    val failureReason = new ExceptionFailure(
+      rejectedExc.getClass.getName,
+      rejectedExc.getMessage,
+      rejectedExc.getStackTrace,
+      org.apache.spark.util.Utils.exceptionString(rejectedExc),
+      None)
+

Review Comment:
   Dropped it and folded the contrast into the positive test - after the 
non-counting shutdown failures, a normal `ExceptionFailure` now increments 
`numFailures` in the same test.



##########
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##########
@@ -552,6 +553,14 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
   }
 
+  test("SPARK-57465: ExecutorShutdownFailure round-trip serialization") {

Review Comment:
   Removed - `testTaskEndReason(ExecutorShutdownFailure(...))` already covers 
the round-trip.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1057,6 +1057,11 @@ private[spark] class TaskSetManager(
           log"Not counting this failure towards the maximum number of failures 
for the task.")
         None
 
+      case _: ExecutorShutdownFailure =>
+        logInfo(log"${MDC(TASK_NAME, taskName(tid))} was rejected because its 
executor was" +
+          log" shutting down. Not counting this towards the maximum number of 
failures.")

Review Comment:
   Aligned the wording with the adjacent `ExecutorLostFailure` message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to