ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130
##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
}
}
- val errorMsgs = new ArrayBuffer[String](2)
+ val listenerExceptions = new ArrayBuffer[Throwable](2)
var listenerOption: Option[T] = None
while ({listenerOption = getNextListenerOrDeregisterThread();
listenerOption.nonEmpty}) {
val listener = listenerOption.get
try {
callback(listener)
} catch {
case e: Throwable =>
- errorMsgs += e.getMessage
+ // A listener failed. Temporarily clear the listenerInvocationThread
and markTaskFailed.
+ //
+ // One of the following cases applies (#3 being the interesting one):
+ //
+ // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]]
because the task body
+ // failed, and now a failure listener has failed here (not
necessarily the first to
+ // fail). Then calling [[markTaskFailed]] again here is a no-op,
and we simply resume
+ // running the remaining failure listeners. [[Task.doRunTask]]
will then call
+ // [[markTaskCompleted]] after this method returns.
+ //
+ // 2. The task body failed, [[Task.doRunTask]] already called
[[markTaskFailed]],
+ // [[Task.doRunTask]] is currently calling [[markTaskCompleted]],
and now a completion
+ // listener has failed here (not necessarily the first one to
fail). Then calling
+ // [[markTaskFailed]] it again here is a no-op, and we simply
resume running the
+ // remaining completion listeners.
+ //
+ // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, and now a completion listener has failed here (the
first one to
+ // fail). Then our call to [[markTaskFailed]] here will run all
failure listeners
+ // before returning, after which we will resume running the
remaining completion
+ // listeners.
+ //
+ // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, but [[markTaskFailed]] is currently running because
a completion listener
+ // has failed, and now a failure listener has failed (not
necessarily the first one to
+ // fail). Then calling [[markTaskFailed]] again here will have no
effect, and we simply
+ // resume running the remaining failure listeners; we will resume
running the remaining
+ // completion listeners after this call returns.
+ //
+ // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]]
because the task body
+ // succeeded, [[markTaskFailed]] already ran because a completion
listener previously
+ // failed, and now another completion listener has failed. Then
our call to
+ // [[markTaskFailed]] here will have no effect and we simply
resume running the
+ // remaining completion handlers.
+ try {
+ listenerInvocationThread = None
Review Comment:
This situation is already handled by the existing code:
First, it is [specifically
documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108)
that
> There are no ordering guarantees for listeners registered in different
threads, or for listeners registered after the task completes
Second, listeners are pushed and popped from the stack in `synchronized`
blocks, so every listener will be executed exactly once.
Third, registering a new listener after the task completes will trigger a
call to `invokeListeners`, which is a no-op if some other thread is already in
that method -- only an invocation that finds an empty
`listenerInvocationThread` can continue.
Fourth, it's already possible that failure listeners and completion
listeners run on different threads, because the `listenerInvocationThread` is
cleared between calls.
The only changes my code can cause are:
1. Failure listeners can be invoked in the middle of processing completion
listeners, if one of the latter throws (where previously they were not invoked
at all).
2. ... in which case different threads might execute the initial completion
listeners vs. the remaining completion listeners. But only if other threads are
trying to register listeners at the same time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]