JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r948579391
##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
}
}
- val errorMsgs = new ArrayBuffer[String](2)
+ val listenerExceptions = new ArrayBuffer[Throwable](2)
var listenerOption: Option[T] = None
while ({listenerOption = getNextListenerOrDeregisterThread();
listenerOption.nonEmpty}) {
val listener = listenerOption.get
try {
callback(listener)
} catch {
case e: Throwable =>
- errorMsgs += e.getMessage
+ // A listener failed. Temporarily clear the listenerInvocationThread
and markTaskFailed.
+ //
+ // One of the following cases applies (#3 being the interesting one):
+ //
+ // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]]
because the task body
+ // failed, and now a failure listener has failed here (not
necessarily the first to
+ // fail). Then calling [[markTaskFailed]] again here is a no-op,
and we simply resume
+ // running the remaining failure listeners. [[Task.doRunTask]]
will then call
+ // [[markTaskCompleted]] after this method returns.
+ //
+ // 2. The task body failed, [[Task.doRunTask]] already called
[[markTaskFailed]],
+ // [[Task.doRunTask]] is currently calling [[markTaskCompleted]],
and now a completion
+ // listener has failed here (not necessarily the first one to
fail). Then calling
+ // [[markTaskFailed]] it again here is a no-op, and we simply
resume running the
+ // remaining completion listeners.
+ //
+ // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, and now a completion listener has failed here (the
first one to
+ // fail). Then our call to [[markTaskFailed]] here will run all
failure listeners
+ // before returning, after which we will resume running the
remaining completion
+ // listeners.
+ //
+ // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, but [[markTaskFailed]] is currently running because
a completion listener
+ // has failed, and now a failure listener has failed (not
necessarily the first one to
+ // fail). Then calling [[markTaskFailed]] again here will have no
effect, and we simply
+ // resume running the remaining failure listeners; we will resume
running the remaining
+ // completion listeners after this call returns.
+ //
+ // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, [[markTaskFailed]] already ran because a completion
listener previously
+ // failed, and now another completion listener has failed. Then
our call to
+ // [[markTaskFailed]] here will have no effect and we simply
resume running the
+ // remaining completion handlers.
+ try {
+ listenerInvocationThread = None
Review Comment:
What if thread A starts invoking listeners, hits an exception, de-registers
the listenerInvocationThread, followed by Thread B registering a new completion
or failure listener which, since the task is completed or failed, causes the
listener-registering thread to try to immediately invoke all listeners. Could
that lead to already-registered listeners being invoked twice? We wouldn't have
_concurrent_ invocation but we'd still get a sequential interleaving of
duplicate listener invocations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]