Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/186#discussion_r11452854
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1128,13 +1108,155 @@ class DAGScheduler(
       }
     
       def stop() {
    +    logInfo("Stopping DAGScheduler")
         if (eventProcessActor != null) {
           eventProcessActor ! StopDAGScheduler
         }
    +    if (dagSchedulerActorSupervisor != null) {
    +      dagSchedulerActorSupervisor ! StopDAGScheduler
    +    }
    --- End diff --
    
    This doesn't look right.  What you are doing is putting a StopDAGScheduler 
message on the eventProcessActor's queue and then immediately putting a 
StopDAGScheduler message on the supervisor's queue.  That's a race that isn't 
likely to result in the desired execution because the supervisor's queue is 
generally going to be much shorter than the eventProcessActor's.  As a result, 
the supervisor is likely to execute context.stop(self) before the 
eventProcessActor handles the StopDAGScheduler message.  That will result in 
the eventProcessActor being stopped before jobs are failed, SparkListenerJobEnd 
events posted to the listener bus, etc.
    
    What I think you want to do is:
    ```scala
    def stop() {
        logInfo("Stopping DAGScheduler")
        dagSchedulerActorSupervisor ! PoisonPill.getInstance
        metadataCleaner.cancel()
        taskScheduler.stop()
      }
    ```
    You don't need the StopDAGScheduler message, since you are effectively 
using it to do what the built-in PoisonPill does.  When the supervisor gets the 
PoisonPill, it will stop its children, which will then execute their postStop 
methods.  So, move the `for (job <- dagScheduler.activeJobs)` stuff from the 
StopDAGScheduler case in the eventProcessActor into its postStop.
    
    The end result being that dagScheduler.stop() will send the supervisor a 
PoisonPill; that PoisonPill will quickly come to the head of the supervisor's 
message queue, which will typically be empty; the supervisor will stop further 
message processing by the eventProcessActor; the eventProcessActor will run its 
postStop method, which will fail the active jobs and notify the listeners; then 
the supervisor's postStop will run and the supervisor and child will be 
shutdown.
    
    I still need to look at why dagScheduler.stop() is getting called from 
DAGSchedulerSuite more often than I expect, but it'll probably be a day or two 
before I can get back to this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to