Refactored agent task launch for better composition [2/2].

This helps to encapsulate a task launch into a single
future which will come in handy when enforcing the task
launch order.

Affected tests are also updated.

Review: https://reviews.apache.org/r/66143/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a9cb8d8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a9cb8d8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a9cb8d8c

Branch: refs/heads/1.5.x
Commit: a9cb8d8c138b1dbdbf630439e33ddd3c652bee9f
Parents: 10c3a31
Author: Meng Zhu <m...@mesosphere.io>
Authored: Wed Apr 4 16:36:42 2018 -0700
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Fri Apr 6 23:42:32 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 224 ++++++++++++++++++++++-------------------
 src/slave/slave.hpp       |   2 -
 src/tests/slave_tests.cpp |  58 ++++++-----
 3 files changed, 155 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 927d384..2044b6e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2206,8 +2206,9 @@ void Slave::run(
       return unschedules;
   };
 
-  // Run the task after the unschedules are done.
-  collect(unschedules)
+  // Handle any unschedule GC failure. If unschedule GC succeeds, trigger
+  // the next continuations.
+  Future<Nothing> taskLaunch = collect(unschedules)
     .repair(defer(self(), onUnscheduleGCFailure))
     .then(defer(
         self(),
@@ -2217,19 +2218,29 @@ void Slave::run(
         task,
         taskGroup,
         resourceVersionUuids,
-        launchExecutor))
-    .recover(defer(self(),
-      [=](const Future<Nothing>& future) -> Future<Nothing> {
-        if (launchExecutor.isSome() && launchExecutor.get()) {
-          // Master expects new executor to be launched for this task launch.
-          // To keep the master executor entries updated, the agent needs to
-          // send 'ExitedExecutorMessage' even though no executor launched.
-          sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-        }
+        launchExecutor));
 
-        return future;
+  taskLaunch
+    .onReady(defer(
+        self(),
+        &Self::__run,
+        frameworkInfo,
+        executorInfo,
+        task,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor))
+    .onFailed(defer(self(), [=](const string& failure) {
+      if (launchExecutor.isSome() && launchExecutor.get()) {
+        // Master expects new executor to be launched for this task launch.
+        // To keep the master executor entries updated, the agent needs to send
+        // 'ExitedExecutorMessage' even though no executor launched.
+        sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
       }
-    ));
+    }));
+
+  // TODO(mzhu): Consolidate error handling code in `__run` here with
+  // then/recover pattern.
 }
 
 
@@ -2330,23 +2341,107 @@ Future<Nothing> Slave::_run(
     authorizations.push_back(authorizeTask(_task, frameworkInfo));
   }
 
-  collect(authorizations)
-    .onAny(defer(self(),
-                 &Self::__run,
-                 lambda::_1,
-                 frameworkInfo,
-                 executorInfo,
-                 task,
-                 taskGroup,
-                 resourceVersionUuids,
-                 launchExecutor));
+  auto onTaskAuthorizationFailure =
+    [=](const string& error, Framework* _framework) {
+      CHECK_NOTNULL(_framework);
 
-  return Nothing();
+      // For failed authorization, we send a TASK_ERROR status update
+      // for all tasks.
+      const TaskStatus::Reason reason = task.isSome()
+        ? TaskStatus::REASON_TASK_UNAUTHORIZED
+        : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+      LOG(ERROR) << "Authorization failed for "
+                 << taskOrTaskGroup(task, taskGroup) << " of framework "
+                 << frameworkId << ": " << error;
+
+      foreach (const TaskInfo& _task, tasks) {
+        _framework->removePendingTask(_task.task_id());
+
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            TASK_ERROR,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            error,
+            reason);
+
+        statusUpdate(update, UPID());
+      }
+
+      if (_framework->idle()) {
+        removeFramework(_framework);
+      }
+  };
+
+  return collect(authorizations)
+    .recover(defer(self(),
+      [=](const Future<list<bool>>& future) -> Future<list<bool>> {
+        CHECK(future.isFailed());
+
+        Framework* _framework = getFramework(frameworkId);
+        if (_framework == nullptr) {
+          const string error =
+            "Authorization failed for " + taskOrTaskGroup(task, taskGroup) +
+            " because the framework " + stringify(frameworkId) +
+            " does not exist";
+
+            LOG(WARNING) << error;
+
+          return Failure(error);
+        }
+
+        const string error =
+          "Failed to authorize " + taskOrTaskGroup(task, taskGroup) +
+          ": " + future.failure();
+
+        onTaskAuthorizationFailure(error, _framework);
+
+        return future;
+      }
+    ))
+    .then(defer(self(),
+      [=](const Future<list<bool>>& future) -> Future<Nothing> {
+        Framework* _framework = getFramework(frameworkId);
+        if (_framework == nullptr) {
+          const string error =
+            "Ignoring running " + taskOrTaskGroup(task, taskGroup) +
+            " because the framework " + stringify(frameworkId) +
+            " does not exist";
+
+            LOG(WARNING) << error;
+
+          return Failure(error);
+        }
+
+        list<bool> authorizations = future.get();
+
+        foreach (const TaskInfo& _task, tasks) {
+          bool authorized = authorizations.front();
+          authorizations.pop_front();
+
+          // If authorization for this task fails, we fail all tasks (in case
+          // of a task group) with this specific error.
+          if (!authorized) {
+            const string error =
+              "Framework " + stringify(frameworkId) +
+              " is not authorized to launch task " + stringify(_task);
+
+            onTaskAuthorizationFailure(error, _framework);
+
+            return Failure(error);
+          }
+        }
+
+        return Nothing();
+      }
+    ));
 }
 
 
 void Slave::__run(
-    const Future<list<bool>>& future,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
@@ -2448,85 +2543,6 @@ void Slave::__run(
     CHECK(framework->removePendingTask(_task.task_id()));
   }
 
-  CHECK(!future.isDiscarded());
-
-  // Validate that the task (or tasks in case of task group) are authorized
-  // to be run on this agent.
-  Option<Error> error = None();
-  if (!future.isReady()) {
-    error = Error("Failed to authorize " + taskOrTaskGroup(task, taskGroup) +
-                  ": " + future.failure());
-  }
-
-  if (error.isNone()) {
-    list<bool> authorizations = future.get();
-
-    foreach (const TaskInfo& _task, tasks) {
-      bool authorized = authorizations.front();
-      authorizations.pop_front();
-
-      // If authorization for this task fails, we fail all tasks (in case of
-      // a task group) with this specific error.
-      if (!authorized) {
-        string user = frameworkInfo.user();
-
-        if (_task.has_command() && _task.command().has_user()) {
-          user = _task.command().user();
-        } else if (executorInfo.has_command() &&
-                   executorInfo.command().has_user()) {
-          user = executorInfo.command().user();
-        }
-
-        error = Error("Task '" + stringify(_task.task_id()) + "'"
-                      " is not authorized to launch as"
-                      " user '" + user + "'");
-
-        break;
-      }
-    }
-  }
-
-  // For failed authorization, we send a TASK_ERROR status update for
-  // all tasks.
-  if (error.isSome()) {
-    const TaskStatus::Reason reason = task.isSome()
-      ? TaskStatus::REASON_TASK_UNAUTHORIZED
-      : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
-
-    LOG(ERROR) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-               << " of framework " << frameworkId
-               << ": " << error->message;
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_ERROR,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          error->message,
-          reason);
-
-      statusUpdate(update, UPID());
-    }
-
-    // Refer to the comment after 'framework->removePendingTask' above
-    // for why we need this.
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
-
-    return;
-  }
-
   // Check task invariants.
   //
   // TODO(bbannier): Instead of copy-pasting identical code to deal

http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 28bbcc4..11cbbc6 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -393,7 +393,6 @@ public:
   virtual void exited(const process::UPID& pid);
 
   void __run(
-      const process::Future<std::list<bool>>& future,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
@@ -401,7 +400,6 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-
   // This is called when the resource limits of the container have
   // been updated for the given tasks and task groups. If the update is
   // successful, we flush the given tasks to the executor by sending

http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 95a61cb..b8669a0 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1831,12 +1831,14 @@ TEST_F(SlaveTest, GetStateTaskGroupPending)
   const SlaveID slaveId = devolve(offer.agent_id());
 
   // Override the default expectation, which forwards calls to the agent's
-  // unmocked `_run()` method. Instead, we want to do nothing so that tasks
-  // remain in the framework's 'pending' list.
+  // unmocked `_run()` method. Instead, we return a pending future to pause
+  // the original continuation so that tasks remain in the framework's
+  // 'pending' list.
+  Promise<Nothing> promise;
   Future<Nothing> _run;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    Return(Nothing())));
+                    Return(promise.future())));
 
   // The executor should not be launched.
   EXPECT_CALL(*executor, connected(_))
@@ -4121,9 +4123,11 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   Option<TaskInfo> task_;
   vector<ResourceVersionUUID> resourceVersionUuids;
   Option<bool> launchExecutor;
+
   // Skip what Slave::_run() normally does, save its arguments for
-  // later, tie reaching the critical moment when to kill the task to
-  // a future.
+  // later, return a pending future to pause the original continuation,
+  // so that we can control when the task is killed.
+  Promise<Nothing> promise;
   Future<Nothing> _run;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
@@ -4133,7 +4137,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
                     SaveArg<3>(&taskGroup),
                     SaveArg<4>(&resourceVersionUuids),
                     SaveArg<5>(&launchExecutor),
-                    Return(Nothing())));
+                    Return(promise.future())));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
@@ -4160,17 +4164,18 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo,
         executorInfo,
         task_,
         taskGroup,
         resourceVersionUuids,
         launchExecutor);
-
-    return Nothing();
   });
 
+  // Resume the original continuation once `unmocked__run` is complete.
+  promise.associate(unmocked__run);
+
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
@@ -4248,8 +4253,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
   // Skip what Slave::_run() normally does, save its arguments for
-  // later, tie reaching the critical moment when to kill the task to
-  // a future.
+  // later, return a pending future to pause the original continuation,
+  // so that we can control when the task is killed.
   FrameworkInfo frameworkInfo1, frameworkInfo2;
   ExecutorInfo executorInfo1, executorInfo2;
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
@@ -4257,6 +4262,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   vector<ResourceVersionUUID> resourceVersionUuids1, resourceVersionUuids2;
   Option<bool> launchExecutor1, launchExecutor2;
 
+  Promise<Nothing> promise1, promise2;
   Future<Nothing> _run1, _run2;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
@@ -4266,7 +4272,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
                     SaveArg<3>(&taskGroup1),
                     SaveArg<4>(&resourceVersionUuids1),
                     SaveArg<5>(&launchExecutor1),
-                    Return(Nothing())))
+                    Return(promise1.future())))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
                     SaveArg<0>(&frameworkInfo2),
                     SaveArg<1>(&executorInfo2),
@@ -4274,7 +4280,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
                     SaveArg<3>(&taskGroup2),
                     SaveArg<4>(&resourceVersionUuids2),
                     SaveArg<5>(&launchExecutor2),
-                    Return(Nothing())));
+                    Return(promise2.future())));
 
   driver.launchTasks(offers.get()[0].id(), {task1, task2});
 
@@ -4310,8 +4316,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   AWAIT_READY(removeFramework);
 
   // The `__run` continuations should have no effect.
-  process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+  Future<Nothing> unmocked__run1 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo1,
         executorInfo1,
         task_1,
@@ -4320,8 +4326,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
         launchExecutor1);
   });
 
-  process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+  Future<Nothing> unmocked__run2 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo2,
         executorInfo2,
         task_2,
@@ -4330,6 +4336,10 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
         launchExecutor2);
   });
 
+  // Resume the original continuation once unmocked__run is complete.
+  promise1.associate(unmocked__run1);
+  promise2.associate(unmocked__run2);
+
   Clock::settle();
 
   driver.stop();
@@ -7214,8 +7224,9 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   Option<bool> launchExecutor;
 
   // Skip what `Slave::_run()` normally does, save its arguments for
-  // later, till reaching the critical moment when to kill the task
-  // in the future.
+  // later, return a pending future to pause the original continuation,
+  // till reaching the critical moment when to kill the task in the future.
+  Promise<Nothing> promise;
   Future<Nothing> _run;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
@@ -7225,7 +7236,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
                     SaveArg<3>(&taskGroup_),
                     SaveArg<4>(&resourceVersionUuids),
                     SaveArg<5>(&launchExecutor),
-                    Return(Nothing())));
+                    Return(promise.future())));
 
   const v1::Offer& offer = offers->offers(0);
   const SlaveID slaveId = devolve(offer.agent_id());
@@ -7286,17 +7297,18 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo,
         executorInfo_,
         task_,
         taskGroup_,
         resourceVersionUuids,
         launchExecutor);
-
-    return Nothing();
   });
 
+  // Resume the original continuation once `unmocked__run` is complete.
+  promise.associate(unmocked__run);
+
   AWAIT_READY(update1);
   AWAIT_READY(update2);
 

Reply via email to