JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949338426
##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
}
}
- val errorMsgs = new ArrayBuffer[String](2)
+ val listenerExceptions = new ArrayBuffer[Throwable](2)
var listenerOption: Option[T] = None
while ({listenerOption = getNextListenerOrDeregisterThread();
listenerOption.nonEmpty}) {
val listener = listenerOption.get
try {
callback(listener)
} catch {
case e: Throwable =>
- errorMsgs += e.getMessage
+ // A listener failed. Temporarily clear the listenerInvocationThread
and markTaskFailed.
+ //
+ // One of the following cases applies (#3 being the interesting one):
+ //
+ // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]]
because the task body
+ // failed, and now a failure listener has failed here (not
necessarily the first to
+ // fail). Then calling [[markTaskFailed]] again here is a no-op,
and we simply resume
+ // running the remaining failure listeners. [[Task.doRunTask]]
will then call
+ // [[markTaskCompleted]] after this method returns.
+ //
+ // 2. The task body failed, [[Task.doRunTask]] already called
[[markTaskFailed]],
+ // [[Task.doRunTask]] is currently calling [[markTaskCompleted]],
and now a completion
+ // listener has failed here (not necessarily the first one to
fail). Then calling
+ // [[markTaskFailed]] it again here is a no-op, and we simply
resume running the
+ // remaining completion listeners.
+ //
+ // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, and now a completion listener has failed here (the
first one to
+ // fail). Then our call to [[markTaskFailed]] here will run all
failure listeners
+ // before returning, after which we will resume running the
remaining completion
+ // listeners.
+ //
+ // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, but [[markTaskFailed]] is currently running because
a completion listener
+ // has failed, and now a failure listener has failed (not
necessarily the first one to
+ // fail). Then calling [[markTaskFailed]] again here will have no
effect, and we simply
+ // resume running the remaining failure listeners; we will resume
running the remaining
+ // completion listeners after this call returns.
+ //
+ // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, [[markTaskFailed]] already ran because a completion
listener previously
+ // failed, and now another completion listener has failed. Then
our call to
+ // [[markTaskFailed]] here will have no effect and we simply
resume running the
+ // remaining completion handlers.
+ try {
+ listenerInvocationThread = None
Review Comment:
To be more precise, it's **_duplicate_ invocations of the _same_ listener**
that I am concerned about.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]