Updated Branches: refs/heads/master bfb16a2e1 -> 4173ec935
Fixed MESOS-851: The drivers now ignore queued messages when aborted. Review: https://reviews.apache.org/r/15872 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/853c606c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/853c606c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/853c606c Branch: refs/heads/master Commit: 853c606c0a8de6346aeefac9b48874d4d4ac38d5 Parents: bfb16a2 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Tue Nov 26 19:39:28 2013 -0800 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Tue Dec 3 11:26:18 2013 -0800 ---------------------------------------------------------------------- src/exec/exec.cpp | 14 ++++++++++++-- src/sched/sched.cpp | 25 +++++++++++++++++-------- 2 files changed, 29 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/853c606c/src/exec/exec.cpp ---------------------------------------------------------------------- diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index a58203b..26a927a 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -407,7 +407,7 @@ protected: void abort() { VLOG(1) << "De-activating the executor libprocess"; - aborted = true; + CHECK(aborted); Lock lock(mutex); pthread_cond_signal(cond); @@ -548,7 +548,7 @@ private: bool connected; // Registered with the slave. UUID connection; // UUID to identify the connection instance. bool local; - bool aborted; + volatile bool aborted; pthread_mutex_t* mutex; pthread_cond_t* cond; const string directory; @@ -742,6 +742,16 @@ Status MesosExecutorDriver::abort() CHECK(process != NULL); + // We set the volatile aborted to true here to prevent any further + // messages from being processed in the ExecutorProcess. However, + // if abort() is called from another thread as the ExecutorProcess, + // there may be at most one additional message processed. + // TODO(bmahler): Use an atomic boolean. + process->aborted = true; + + // Dispatching here ensures that we still process the outstanding + // requests *from* the executor, since those do proceed when + // aborted is true. dispatch(process, &ExecutorProcess::abort); return status = DRIVER_ABORTED; http://git-wip-us.apache.org/repos/asf/mesos/blob/853c606c/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index b958435..c465356 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -743,18 +743,17 @@ protected: { VLOG(1) << "Aborting framework '" << framework.id() << "'"; - aborted = true; + CHECK(aborted); if (!connected) { VLOG(1) << "Not sending a deactivate message as master is disconnected"; - return; + } else { + DeactivateFrameworkMessage message; + message.mutable_framework_id()->MergeFrom(framework.id()); + CHECK_SOME(master); + send(master.get(), message); } - DeactivateFrameworkMessage message; - message.mutable_framework_id()->MergeFrom(framework.id()); - CHECK_SOME(master); - send(master.get(), message); - Lock lock(mutex); pthread_cond_signal(cond); } @@ -982,7 +981,7 @@ private: bool failover; Result<UPID> master; - volatile bool connected; // Flag to indicate if framework is registered. + bool connected; // Flag to indicate if framework is registered. volatile bool aborted; // Flag to indicate if the driver is aborted. MasterDetector* detector; @@ -1269,6 +1268,16 @@ Status MesosSchedulerDriver::abort() CHECK(process != NULL); + // We set the volatile aborted to true here to prevent any further + // messages from being processed in the SchedulerProcess. However, + // if abort() is called from another thread as the SchedulerProcess, + // there may be at most one additional message processed. + // TODO(bmahler): Use an atomic boolean. + process->aborted = true; + + // Dispatching here ensures that we still process the outstanding + // requests *from* the scheduler, since those do proceed when + // aborted is true. dispatch(process, &SchedulerProcess::abort); return status = DRIVER_ABORTED;