ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); 
listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread 
and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] 
because the task body
+          //    failed, and now a failure listener has failed here (not 
necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, 
and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] 
will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called 
[[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], 
and now a completion
+          //    listener has failed here (not necessarily the first one to 
fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply 
resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] 
because the task body
+          //    succeeded, and now a completion listener has failed here (the 
first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all 
failure listeners
+          //    before returning, after which we will resume running the 
remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] 
because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because 
a completion listener
+          //    has failed, and now a failure listener has failed (not 
necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no 
effect, and we simply
+          //    resume running the remaining failure listeners; we will resume 
running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] 
because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion 
listener previously
+          //    failed, and now another completion listener has failed. Then 
our call to
+          //    [[markTaskFailed]] here will have no effect and we simply 
resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically 
documented](https://src.dev.databricks.com/databricks/runtime/-/blob/core/src/main/scala/org/apache/spark/TaskContext.scala?L138)
 that 
   
   > There are no ordering guarantees for listeners registered in different 
threads, or for listeners registered after the task completes
   
   Second, tasks are pushed and popped from the stack in `synchronized` blocks, 
and only an invocation that finds the `listenerInvocationThread` empty can 
continue, so every task will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a 
call to `invokeListeners`, which is a no-op if some other thread is already in 
that method.
   
   Fourth, it's already possible for failure listeners and completion listeners 
to run on different threads, because the `listenerInvocationThread` is cleared 
between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion 
listeners, if one of the latter throws (where previously they were not invoked 
at all).
   2. ... in which case different threads might execute the initial completion 
listeners vs. the remaining completion listeners. But only if other threads are 
trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to