yadavay-amzn commented on code in PR #56628:
URL: https://github.com/apache/spark/pull/56628#discussion_r3472121136
##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -571,10 +571,18 @@ private[spark] class Executor(
try {
logError(log"Executor launch task ${MDC(TASK_NAME,
taskDescription.name)} failed," +
log" reason: ${MDC(REASON, t.getMessage)}")
+ // SPARK-57465: If the thread pool rejected the task because the
executor is shutting
+ // down, report a non-counting failure so the task can be
rescheduled elsewhere.
+ val reason: TaskFailedReason = t match {
+ case _: RejectedExecutionException if executorShutdown.get() =>
Review Comment:
Added two ExecutorSuite tests using the SPARK-54087 reflection pattern (mock
`threadPool.execute` to throw, capture + deserialize the statusUpdate reason):
(a) `executorShutdown` set -> `ExecutorShutdownFailure`
(`countTowardsTaskFailures=false`), (b) flag unset -> `ExceptionFailure`. They
drive `launchTask` directly, so the actual mapping is now covered (shared setup
extracted into a helper).
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2921,6 +2921,82 @@ class TaskSetManagerSuite
s"\nCaptured logs:\n${logs.mkString("\n")}")
}
+ // SPARK-57465: Demonstrates that RejectedExecutionException (wrapped as
ExceptionFailure)
+ // counts toward task failure budget, consuming retries for a non-task-fault.
+ test("SPARK-57465: RejectedExecutionException counts toward task failures
(bug repro)") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = FakeTask.createTaskSet(1)
+ val clock = new ManualClock(1) // start at 1 so markFinished(time > 0)
passes
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+
+ // Simulate the failure that Executor.launchTask sends when
threadPool.execute throws
+ // RejectedExecutionException: it wraps as ExceptionFailure (which has
+ // countTowardsTaskFailures = true by default).
+ val rejectedExc = new java.util.concurrent.RejectedExecutionException(
+ "Task org.apache.spark.executor.Executor$TaskRunner rejected from " +
+ "java.util.concurrent.ThreadPoolExecutor [Shutting down]")
+ val failureReason = new ExceptionFailure(
+ rejectedExc.getClass.getName,
+ rejectedExc.getMessage,
+ rejectedExc.getStackTrace,
+ org.apache.spark.util.Utils.exceptionString(rejectedExc),
+ None)
+
Review Comment:
Dropped it and folded the contrast into the positive test - after the
non-counting shutdown failures, a normal `ExceptionFailure` now increments
`numFailures` in the same test.
##########
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##########
@@ -552,6 +553,14 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
}
+ test("SPARK-57465: ExecutorShutdownFailure round-trip serialization") {
Review Comment:
Removed - `testTaskEndReason(ExecutorShutdownFailure(...))` already covers
the round-trip.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1057,6 +1057,11 @@ private[spark] class TaskSetManager(
log"Not counting this failure towards the maximum number of failures
for the task.")
None
+ case _: ExecutorShutdownFailure =>
+ logInfo(log"${MDC(TASK_NAME, taskName(tid))} was rejected because its
executor was" +
+ log" shutting down. Not counting this towards the maximum number of
failures.")
Review Comment:
Aligned the wording with the adjacent `ExecutorLostFailure` message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]