Enforced task launch order on the agent.

Up until now, Mesos does not guarantee in-order
task launch on the agent. There are two asynchronous
steps (unschedule GC and task authorization) in the
agent task launch path. Depending on the CPU scheduling
order, a later task launch may finish these two steps earlier
than its predecessors and get to the launch executor stage
earlier, resulting in out-of-order task delivery.

This patch enforces the task delivery order by sequencing
task launch after the two asynchronous steps, specifically
right before entering `__run()`.

Review: https://reviews.apache.org/r/66144/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f8b19a9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f8b19a9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f8b19a9

Branch: refs/heads/1.5.x
Commit: 3f8b19a92b7f28e6efcf1cd9397b380c995e9948
Parents: 2d075e3
Author: Meng Zhu <m...@mesosphere.io>
Authored: Wed Apr 4 16:36:52 2018 -0700
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Fri Apr 6 23:42:39 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 156 +++++++++++++++++++++++++++++++++++------------
 src/slave/slave.hpp |  14 +++++
 2 files changed, 132 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3f8b19a9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2044b6e..0d89915 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2206,10 +2206,12 @@ void Slave::run(
       return unschedules;
   };
 
-  // Handle any unschedule GC failure. If unschedule GC succeeds, trigger
-  // the next continuations.
+  // `taskLaunch` encapsulates each task's launch steps from this point
+  // to the end of `_run` (the completion of task authorization).
   Future<Nothing> taskLaunch = collect(unschedules)
+    // Handle the failure iff unschedule GC fails.
     .repair(defer(self(), onUnscheduleGCFailure))
+    // If unschedule GC succeeds, trigger the next continuation.
     .then(defer(
         self(),
         &Self::_run,
@@ -2220,27 +2222,80 @@ void Slave::run(
         resourceVersionUuids,
         launchExecutor));
 
-  taskLaunch
-    .onReady(defer(
-        self(),
-        &Self::__run,
-        frameworkInfo,
-        executorInfo,
-        task,
-        taskGroup,
-        resourceVersionUuids,
-        launchExecutor))
-    .onFailed(defer(self(), [=](const string& failure) {
-      if (launchExecutor.isSome() && launchExecutor.get()) {
-        // Master expects new executor to be launched for this task launch.
-        // To keep the master executor entries updated, the agent needs to send
-        // 'ExitedExecutorMessage' even though no executor launched.
-        sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-      }
+  // Use a sequence to ensure that task launch order is preserved.
+  framework->taskLaunchSequences[executorId]
+    .add<Nothing>([taskLaunch]() -> Future<Nothing> {
+      // We use this sequence only to maintain the task launching order. If the
+      // sequence is deleted, we do not want the resulting discard event to
+      // propagate up the chain, which would prevent the previous `.then()` or
+      // `.repair()` callbacks from being invoked. Thus, we use `undiscardable`
+      // to protect each `taskLaunch`.
+      return undiscardable(taskLaunch);
+    })
+    // We register `onAny` on the future returned by the sequence (referred to
+    // as `seqFuture` below). The following scenarios could happen:
+    //
+    // (1) `seqFuture` becomes ready. This happens when all previous tasks'
+    // `taskLaunch` futures are in non-pending state AND this task's own
+    // `taskLaunch` future is in ready state. The `onReady` call registered
+    // below will be triggered and continue the success path.
+    //
+    // (2) `seqFuture` becomes failed. This happens when all previous tasks'
+    // `taskLaunch` futures are in non-pending state AND this task's own
+    // `taskLaunch` future is in failed state (e.g. due to unschedule GC
+    // failure or some other failure). The `onFailed` call registered below
+    // will be triggered to handle the failure.
+    //
+    // (3) `seqFuture` becomes discarded. This happens when the sequence is
+    // destructed (see declaration of `taskLaunchSequences` on its lifecycle)
+    // while the `seqFuture` is still pending. In this case, we wait until
+    // this task's own `taskLaunch` future becomes non-pending and trigger
+    // callbacks accordingly.
+    //
+    // TODO(mzhu): In case (3), the destruction of the sequence means that the
+    // agent will eventually discover that the executor is absent and drop
+    // the task. While `__run` is capable of handling this case, it is more
+    // optimal to handle the failure earlier here rather than waiting for
+    // the `taskLaunch` transition and directing control to `__run`.
+    .onAny(defer(self(), [=](const Future<Nothing>&) {
+      // We only want to execute the following callbacks once the work 
performed
+      // in the `taskLaunch` chain is complete. Thus, we add them onto the
+      // `taskLaunch` chain rather than dispatching directly.
+      taskLaunch
+        .onReady(defer(
+            self(),
+            &Self::__run,
+            frameworkInfo,
+            executorInfo,
+            task,
+            taskGroup,
+            resourceVersionUuids,
+            launchExecutor))
+        .onFailed(defer(self(), [=](const string& failure) {
+          Framework* _framework = getFramework(frameworkId);
+          if (_framework == nullptr) {
+            LOG(WARNING) << "Ignoring running "
+                         << taskOrTaskGroup(task, taskGroup)
+                         << " because the framework " << stringify(frameworkId)
+                         << " does not exist";
+          }
+
+          if (launchExecutor.isSome() && launchExecutor.get()) {
+            // Master expects a new executor to be launched for this task(s).
+            // To keep the master executor entries updated, the agent needs to
+            // send `ExitedExecutorMessage` even though no executor launched.
+            sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+            // See the declaration of `taskLaunchSequences` regarding its
+            // lifecycle management.
+            if (_framework != nullptr) {
+              
_framework->taskLaunchSequences.erase(executorInfo.executor_id());
+            }
+          }
+        }));
     }));
 
-  // TODO(mzhu): Consolidate error handling code in `__run` here with
-  // then/recover pattern.
+  // TODO(mzhu): Consolidate error handling code in `__run` here.
 }
 
 
@@ -2377,10 +2432,8 @@ Future<Nothing> Slave::_run(
   };
 
   return collect(authorizations)
-    .recover(defer(self(),
+    .repair(defer(self(),
       [=](const Future<list<bool>>& future) -> Future<list<bool>> {
-        CHECK(future.isFailed());
-
         Framework* _framework = getFramework(frameworkId);
         if (_framework == nullptr) {
           const string error =
@@ -2469,10 +2522,13 @@ void Slave::__run(
                  << " does not exist";
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // There is no need to clean up the task launch sequence here since
+      // the framework (along with the sequence) no longer exists.
     }
 
     return;
@@ -2498,10 +2554,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2530,10 +2590,14 @@ void Slave::__run(
                  << " because it has been killed in the meantime";
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2621,10 +2685,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2695,10 +2763,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2755,10 +2827,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2865,9 +2941,9 @@ void Slave::__run(
             statusUpdate(update, UPID());
           }
 
-          // Master expects new executor to be launched for this task(s) 
launch.
+          // Master expects a new executor to be launched for this task(s).
           // To keep the master executor entries updated, the agent needs to
-          // send 'ExitedExecutorMessage' even though no executor launched.
+          // send `ExitedExecutorMessage` even though no executor launched.
           if (executor->state == Executor::TERMINATED) {
             sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
           } else {
@@ -8959,6 +9035,10 @@ void Framework::destroyExecutor(const ExecutorID& 
executorId)
     Executor* executor = executors[executorId];
     executors.erase(executorId);
 
+    // See the declaration of `taskLaunchSequences` regarding its
+    // lifecycle management.
+    taskLaunchSequences.erase(executorId);
+
     // Pass ownership of the executor pointer.
     completedExecutors.push_back(Owned<Executor>(executor));
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f8b19a9/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 11cbbc6..ca8cc65 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -53,6 +53,7 @@
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 #include <process/shared.hpp>
+#include <process/sequence.hpp>
 
 #include <stout/boundedhashmap.hpp>
 #include <stout/bytes.hpp>
@@ -1138,6 +1139,19 @@ public:
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
 
+  // Sequences in this map are used to enforce the order of tasks launched on
+  // each executor.
+  //
+  // Note on the lifecycle of the sequence: if the corresponding executor 
struct
+  // has not been created, we tie the lifecycle of the sequence to the first
+  // task in the sequence (which must have the `launch_executor` flag set to
+  // true modulo MESOS-3870). If the task fails to launch before creating the
+  // executor struct, we will delete the sequence. Once the executor struct is
+  // created, we tie the lifecycle of the sequence to the executor struct.
+  //
+  // TODO(mzhu): Create the executor struct early and put the sequence in it.
+  hashmap<ExecutorID, process::Sequence> taskLaunchSequences;
+
   // Pending task groups. This is needed for correctly sending
   // TASK_KILLED status updates for all tasks in the group if
   // any of the tasks are killed while pending.

Reply via email to