Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4135#discussion_r26726692
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -82,9 +97,27 @@ private[spark] class TaskContextImpl(
}
}
- /** Marks the task for interruption, i.e. cancellation. */
+ /**
+ * Marks the task as interruption, i.e. cancellation. We add this
+ * method for some more clean works. For example, we need to register
+ * an "interruption" callback to completely stop a receiver supervisor.
+ */
private[spark] def markInterrupted(): Unit = {
interrupted = true
+ val errorMsgs = new ArrayBuffer[String](2)
+ // Process interruption callbacks in the reverse order of registration
+ onInterruptedCallbacks.reverse.foreach { listener =>
+ try {
+ listener.onTaskInterrupted(this)
+ } catch {
+ case e: Throwable =>
--- End diff --
It might be dangerous for this code to catch all Throwables, since this
could also catch irrecoverable exceptions like OutOfMemoryError. Instead, it
might be safer to use `case NonFatal(e) =>`. If you agree, then let's also
update the original code in `markTaskCompleted()` to catch NonFatal as well.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]