Enforced task launch order on the agent. Up until now, Mesos does not guarantee in-order task launch on the agent. There are two asynchronous steps (unschedule GC and task authorization) in the agent task launch path. Depending on the CPU scheduling order, a later task launch may finish these two steps earlier than its predecessors and get to the launch executor stage earlier, resulting in out-of-order task delivery.
This patch enforces the task delivery order by sequencing task launch after the two asynchronous steps, specifically right before entering `__run()`. Review: https://reviews.apache.org/r/66144/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f8b19a9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f8b19a9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f8b19a9 Branch: refs/heads/1.5.x Commit: 3f8b19a92b7f28e6efcf1cd9397b380c995e9948 Parents: 2d075e3 Author: Meng Zhu <m...@mesosphere.io> Authored: Wed Apr 4 16:36:52 2018 -0700 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Fri Apr 6 23:42:39 2018 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 156 +++++++++++++++++++++++++++++++++++------------ src/slave/slave.hpp | 14 +++++ 2 files changed, 132 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3f8b19a9/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 2044b6e..0d89915 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2206,10 +2206,12 @@ void Slave::run( return unschedules; }; - // Handle any unschedule GC failure. If unschedule GC succeeds, trigger - // the next continuations. + // `taskLaunch` encapsulates each task's launch steps from this point + // to the end of `_run` (the completion of task authorization). Future<Nothing> taskLaunch = collect(unschedules) + // Handle the failure iff unschedule GC fails. .repair(defer(self(), onUnscheduleGCFailure)) + // If unschedule GC succeeds, trigger the next continuation. .then(defer( self(), &Self::_run, @@ -2220,27 +2222,80 @@ void Slave::run( resourceVersionUuids, launchExecutor)); - taskLaunch - .onReady(defer( - self(), - &Self::__run, - frameworkInfo, - executorInfo, - task, - taskGroup, - resourceVersionUuids, - launchExecutor)) - .onFailed(defer(self(), [=](const string& failure) { - if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task launch. - // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. - sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); - } + // Use a sequence to ensure that task launch order is preserved. + framework->taskLaunchSequences[executorId] + .add<Nothing>([taskLaunch]() -> Future<Nothing> { + // We use this sequence only to maintain the task launching order. If the + // sequence is deleted, we do not want the resulting discard event to + // propagate up the chain, which would prevent the previous `.then()` or + // `.repair()` callbacks from being invoked. Thus, we use `undiscardable` + // to protect each `taskLaunch`. + return undiscardable(taskLaunch); + }) + // We register `onAny` on the future returned by the sequence (referred to + // as `seqFuture` below). The following scenarios could happen: + // + // (1) `seqFuture` becomes ready. This happens when all previous tasks' + // `taskLaunch` futures are in non-pending state AND this task's own + // `taskLaunch` future is in ready state. The `onReady` call registered + // below will be triggered and continue the success path. + // + // (2) `seqFuture` becomes failed. This happens when all previous tasks' + // `taskLaunch` futures are in non-pending state AND this task's own + // `taskLaunch` future is in failed state (e.g. due to unschedule GC + // failure or some other failure). The `onFailed` call registered below + // will be triggered to handle the failure. + // + // (3) `seqFuture` becomes discarded. This happens when the sequence is + // destructed (see declaration of `taskLaunchSequences` on its lifecycle) + // while the `seqFuture` is still pending. In this case, we wait until + // this task's own `taskLaunch` future becomes non-pending and trigger + // callbacks accordingly. + // + // TODO(mzhu): In case (3), the destruction of the sequence means that the + // agent will eventually discover that the executor is absent and drop + // the task. While `__run` is capable of handling this case, it is more + // optimal to handle the failure earlier here rather than waiting for + // the `taskLaunch` transition and directing control to `__run`. + .onAny(defer(self(), [=](const Future<Nothing>&) { + // We only want to execute the following callbacks once the work performed + // in the `taskLaunch` chain is complete. Thus, we add them onto the + // `taskLaunch` chain rather than dispatching directly. + taskLaunch + .onReady(defer( + self(), + &Self::__run, + frameworkInfo, + executorInfo, + task, + taskGroup, + resourceVersionUuids, + launchExecutor)) + .onFailed(defer(self(), [=](const string& failure) { + Framework* _framework = getFramework(frameworkId); + if (_framework == nullptr) { + LOG(WARNING) << "Ignoring running " + << taskOrTaskGroup(task, taskGroup) + << " because the framework " << stringify(frameworkId) + << " does not exist"; + } + + if (launchExecutor.isSome() && launchExecutor.get()) { + // Master expects a new executor to be launched for this task(s). + // To keep the master executor entries updated, the agent needs to + // send `ExitedExecutorMessage` even though no executor launched. + sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its + // lifecycle management. + if (_framework != nullptr) { + _framework->taskLaunchSequences.erase(executorInfo.executor_id()); + } + } + })); })); - // TODO(mzhu): Consolidate error handling code in `__run` here with - // then/recover pattern. + // TODO(mzhu): Consolidate error handling code in `__run` here. } @@ -2377,10 +2432,8 @@ Future<Nothing> Slave::_run( }; return collect(authorizations) - .recover(defer(self(), + .repair(defer(self(), [=](const Future<list<bool>>& future) -> Future<list<bool>> { - CHECK(future.isFailed()); - Framework* _framework = getFramework(frameworkId); if (_framework == nullptr) { const string error = @@ -2469,10 +2522,13 @@ void Slave::__run( << " does not exist"; if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // There is no need to clean up the task launch sequence here since + // the framework (along with the sequence) no longer exists. } return; @@ -2498,10 +2554,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2530,10 +2590,14 @@ void Slave::__run( << " because it has been killed in the meantime"; if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2621,10 +2685,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2695,10 +2763,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2755,10 +2827,14 @@ void Slave::__run( } if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. + // `ExitedExecutorMessage` even though no executor launched. sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); + + // See the declaration of `taskLaunchSequences` regarding its lifecycle + // management. + framework->taskLaunchSequences.erase(executorInfo.executor_id()); } return; @@ -2865,9 +2941,9 @@ void Slave::__run( statusUpdate(update, UPID()); } - // Master expects new executor to be launched for this task(s) launch. + // Master expects a new executor to be launched for this task(s). // To keep the master executor entries updated, the agent needs to - // send 'ExitedExecutorMessage' even though no executor launched. + // send `ExitedExecutorMessage` even though no executor launched. if (executor->state == Executor::TERMINATED) { sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); } else { @@ -8959,6 +9035,10 @@ void Framework::destroyExecutor(const ExecutorID& executorId) Executor* executor = executors[executorId]; executors.erase(executorId); + // See the declaration of `taskLaunchSequences` regarding its + // lifecycle management. + taskLaunchSequences.erase(executorId); + // Pass ownership of the executor pointer. completedExecutors.push_back(Owned<Executor>(executor)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3f8b19a9/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 11cbbc6..ca8cc65 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -53,6 +53,7 @@ #include <process/process.hpp> #include <process/protobuf.hpp> #include <process/shared.hpp> +#include <process/sequence.hpp> #include <stout/boundedhashmap.hpp> #include <stout/bytes.hpp> @@ -1138,6 +1139,19 @@ public: // Executors with pending tasks. hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks; + // Sequences in this map are used to enforce the order of tasks launched on + // each executor. + // + // Note on the lifecycle of the sequence: if the corresponding executor struct + // has not been created, we tie the lifecycle of the sequence to the first + // task in the sequence (which must have the `launch_executor` flag set to + // true modulo MESOS-3870). If the task fails to launch before creating the + // executor struct, we will delete the sequence. Once the executor struct is + // created, we tie the lifecycle of the sequence to the executor struct. + // + // TODO(mzhu): Create the executor struct early and put the sequence in it. + hashmap<ExecutorID, process::Sequence> taskLaunchSequences; + // Pending task groups. This is needed for correctly sending // TASK_KILLED status updates for all tasks in the group if // any of the tasks are killed while pending.