Github user markhamstra commented on a diff in the pull request:
https://github.com/apache/spark/pull/186#discussion_r11452854
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1128,13 +1108,155 @@ class DAGScheduler(
}
def stop() {
+ logInfo("Stopping DAGScheduler")
if (eventProcessActor != null) {
eventProcessActor ! StopDAGScheduler
}
+ if (dagSchedulerActorSupervisor != null) {
+ dagSchedulerActorSupervisor ! StopDAGScheduler
+ }
--- End diff --
This doesn't look right. What you are doing is putting a StopDAGScheduler
message on the eventProcessActor's queue and then immediately putting a
StopDAGScheduler message on the supervisor's queue. That's a race that isn't
likely to result in the desired execution because the supervisor's queue is
generally going to be much shorter than the eventProcessActor's. As a result,
the supervisor is likely to execute context.stop(self) before the
eventProcessActor handles the StopDAGScheduler message. That will result in
the eventProcessActor being stopped before jobs are failed, SparkListenerJobEnd
events posted to the listener bus, etc.
What I think you want to do is:
```scala
def stop() {
logInfo("Stopping DAGScheduler")
dagSchedulerActorSupervisor ! PoisonPill.getInstance
metadataCleaner.cancel()
taskScheduler.stop()
}
```
You don't need the StopDAGScheduler message, since you are effectively
using it to do what the built-in PoisonPill does. When the supervisor gets the
PoisonPill, it will stop its children, which will then execute their postStop
methods. So, move the `for (job <- dagScheduler.activeJobs)` stuff from the
StopDAGScheduler case in the eventProcessActor into its postStop.
The end result being that dagScheduler.stop() will send the supervisor a
PoisonPill; that PoisonPill will quickly come to the head of the supervisor's
message queue, which will typically be empty; the supervisor will stop further
message processing by the eventProcessActor; the eventProcessActor will run its
postStop method, which will fail the active jobs and notify the listeners; then
the supervisor's postStop will run and the supervisor and child will be
shutdown.
I still need to look at why dagScheduler.stop() is getting called from
DAGSchedulerSuite more often than I expect, but it'll probably be a day or two
before I can get back to this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---