JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949338426


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); 
listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread 
and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] 
because the task body
+          //    failed, and now a failure listener has failed here (not 
necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, 
and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] 
will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called 
[[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], 
and now a completion
+          //    listener has failed here (not necessarily the first one to 
fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply 
resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] 
because the task body
+          //    succeeded, and now a completion listener has failed here (the 
first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all 
failure listeners
+          //    before returning, after which we will resume running the 
remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] 
because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because 
a completion listener
+          //    has failed, and now a failure listener has failed (not 
necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no 
effect, and we simply
+          //    resume running the remaining failure listeners; we will resume 
running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] 
because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion 
listener previously
+          //    failed, and now another completion listener has failed. Then 
our call to
+          //    [[markTaskFailed]] here will have no effect and we simply 
resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   To be more precise, it's **_duplicate_ invocations of the _same_ listener** 
that I am concerned about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to