Refactored agent task launch for better composition [2/2]. This helps to encapsulate a task launch into a single future which will come in handy when enforcing the task launch order.
Affected tests are also updated. Review: https://reviews.apache.org/r/66143/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a9cb8d8c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a9cb8d8c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a9cb8d8c Branch: refs/heads/1.5.x Commit: a9cb8d8c138b1dbdbf630439e33ddd3c652bee9f Parents: 10c3a31 Author: Meng Zhu <m...@mesosphere.io> Authored: Wed Apr 4 16:36:42 2018 -0700 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Fri Apr 6 23:42:32 2018 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 224 ++++++++++++++++++++++------------------- src/slave/slave.hpp | 2 - src/tests/slave_tests.cpp | 58 ++++++----- 3 files changed, 155 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 927d384..2044b6e 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2206,8 +2206,9 @@ void Slave::run( return unschedules; }; - // Run the task after the unschedules are done. - collect(unschedules) + // Handle any unschedule GC failure. If unschedule GC succeeds, trigger + // the next continuations. + Future<Nothing> taskLaunch = collect(unschedules) .repair(defer(self(), onUnscheduleGCFailure)) .then(defer( self(), @@ -2217,19 +2218,29 @@ void Slave::run( task, taskGroup, resourceVersionUuids, - launchExecutor)) - .recover(defer(self(), - [=](const Future<Nothing>& future) -> Future<Nothing> { - if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task launch. - // To keep the master executor entries updated, the agent needs to - // send 'ExitedExecutorMessage' even though no executor launched. - sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); - } + launchExecutor)); - return future; + taskLaunch + .onReady(defer( + self(), + &Self::__run, + frameworkInfo, + executorInfo, + task, + taskGroup, + resourceVersionUuids, + launchExecutor)) + .onFailed(defer(self(), [=](const string& failure) { + if (launchExecutor.isSome() && launchExecutor.get()) { + // Master expects new executor to be launched for this task launch. + // To keep the master executor entries updated, the agent needs to send + // 'ExitedExecutorMessage' even though no executor launched. + sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); } - )); + })); + + // TODO(mzhu): Consolidate error handling code in `__run` here with + // then/recover pattern. } @@ -2330,23 +2341,107 @@ Future<Nothing> Slave::_run( authorizations.push_back(authorizeTask(_task, frameworkInfo)); } - collect(authorizations) - .onAny(defer(self(), - &Self::__run, - lambda::_1, - frameworkInfo, - executorInfo, - task, - taskGroup, - resourceVersionUuids, - launchExecutor)); + auto onTaskAuthorizationFailure = + [=](const string& error, Framework* _framework) { + CHECK_NOTNULL(_framework); - return Nothing(); + // For failed authorization, we send a TASK_ERROR status update + // for all tasks. + const TaskStatus::Reason reason = task.isSome() + ? TaskStatus::REASON_TASK_UNAUTHORIZED + : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED; + + LOG(ERROR) << "Authorization failed for " + << taskOrTaskGroup(task, taskGroup) << " of framework " + << frameworkId << ": " << error; + + foreach (const TaskInfo& _task, tasks) { + _framework->removePendingTask(_task.task_id()); + + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + _task.task_id(), + TASK_ERROR, + TaskStatus::SOURCE_SLAVE, + id::UUID::random(), + error, + reason); + + statusUpdate(update, UPID()); + } + + if (_framework->idle()) { + removeFramework(_framework); + } + }; + + return collect(authorizations) + .recover(defer(self(), + [=](const Future<list<bool>>& future) -> Future<list<bool>> { + CHECK(future.isFailed()); + + Framework* _framework = getFramework(frameworkId); + if (_framework == nullptr) { + const string error = + "Authorization failed for " + taskOrTaskGroup(task, taskGroup) + + " because the framework " + stringify(frameworkId) + + " does not exist"; + + LOG(WARNING) << error; + + return Failure(error); + } + + const string error = + "Failed to authorize " + taskOrTaskGroup(task, taskGroup) + + ": " + future.failure(); + + onTaskAuthorizationFailure(error, _framework); + + return future; + } + )) + .then(defer(self(), + [=](const Future<list<bool>>& future) -> Future<Nothing> { + Framework* _framework = getFramework(frameworkId); + if (_framework == nullptr) { + const string error = + "Ignoring running " + taskOrTaskGroup(task, taskGroup) + + " because the framework " + stringify(frameworkId) + + " does not exist"; + + LOG(WARNING) << error; + + return Failure(error); + } + + list<bool> authorizations = future.get(); + + foreach (const TaskInfo& _task, tasks) { + bool authorized = authorizations.front(); + authorizations.pop_front(); + + // If authorization for this task fails, we fail all tasks (in case + // of a task group) with this specific error. + if (!authorized) { + const string error = + "Framework " + stringify(frameworkId) + + " is not authorized to launch task " + stringify(_task); + + onTaskAuthorizationFailure(error, _framework); + + return Failure(error); + } + } + + return Nothing(); + } + )); } void Slave::__run( - const Future<list<bool>>& future, const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, @@ -2448,85 +2543,6 @@ void Slave::__run( CHECK(framework->removePendingTask(_task.task_id())); } - CHECK(!future.isDiscarded()); - - // Validate that the task (or tasks in case of task group) are authorized - // to be run on this agent. - Option<Error> error = None(); - if (!future.isReady()) { - error = Error("Failed to authorize " + taskOrTaskGroup(task, taskGroup) + - ": " + future.failure()); - } - - if (error.isNone()) { - list<bool> authorizations = future.get(); - - foreach (const TaskInfo& _task, tasks) { - bool authorized = authorizations.front(); - authorizations.pop_front(); - - // If authorization for this task fails, we fail all tasks (in case of - // a task group) with this specific error. - if (!authorized) { - string user = frameworkInfo.user(); - - if (_task.has_command() && _task.command().has_user()) { - user = _task.command().user(); - } else if (executorInfo.has_command() && - executorInfo.command().has_user()) { - user = executorInfo.command().user(); - } - - error = Error("Task '" + stringify(_task.task_id()) + "'" - " is not authorized to launch as" - " user '" + user + "'"); - - break; - } - } - } - - // For failed authorization, we send a TASK_ERROR status update for - // all tasks. - if (error.isSome()) { - const TaskStatus::Reason reason = task.isSome() - ? TaskStatus::REASON_TASK_UNAUTHORIZED - : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED; - - LOG(ERROR) << "Ignoring running " << taskOrTaskGroup(task, taskGroup) - << " of framework " << frameworkId - << ": " << error->message; - - foreach (const TaskInfo& _task, tasks) { - const StatusUpdate update = protobuf::createStatusUpdate( - frameworkId, - info.id(), - _task.task_id(), - TASK_ERROR, - TaskStatus::SOURCE_SLAVE, - id::UUID::random(), - error->message, - reason); - - statusUpdate(update, UPID()); - } - - // Refer to the comment after 'framework->removePendingTask' above - // for why we need this. - if (framework->idle()) { - removeFramework(framework); - } - - if (launchExecutor.isSome() && launchExecutor.get()) { - // Master expects new executor to be launched for this task(s) launch. - // To keep the master executor entries updated, the agent needs to send - // 'ExitedExecutorMessage' even though no executor launched. - sendExitedExecutorMessage(frameworkId, executorInfo.executor_id()); - } - - return; - } - // Check task invariants. // // TODO(bbannier): Instead of copy-pasting identical code to deal http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 28bbcc4..11cbbc6 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -393,7 +393,6 @@ public: virtual void exited(const process::UPID& pid); void __run( - const process::Future<std::list<bool>>& future, const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, @@ -401,7 +400,6 @@ public: const std::vector<ResourceVersionUUID>& resourceVersionUuids, const Option<bool>& launchExecutor); - // This is called when the resource limits of the container have // been updated for the given tasks and task groups. If the update is // successful, we flush the given tasks to the executor by sending http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 95a61cb..b8669a0 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -1831,12 +1831,14 @@ TEST_F(SlaveTest, GetStateTaskGroupPending) const SlaveID slaveId = devolve(offer.agent_id()); // Override the default expectation, which forwards calls to the agent's - // unmocked `_run()` method. Instead, we want to do nothing so that tasks - // remain in the framework's 'pending' list. + // unmocked `_run()` method. Instead, we return a pending future to pause + // the original continuation so that tasks remain in the framework's + // 'pending' list. + Promise<Nothing> promise; Future<Nothing> _run; EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run), - Return(Nothing()))); + Return(promise.future()))); // The executor should not be launched. EXPECT_CALL(*executor, connected(_)) @@ -4121,9 +4123,11 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) Option<TaskInfo> task_; vector<ResourceVersionUUID> resourceVersionUuids; Option<bool> launchExecutor; + // Skip what Slave::_run() normally does, save its arguments for - // later, tie reaching the critical moment when to kill the task to - // a future. + // later, return a pending future to pause the original continuation, + // so that we can control when the task is killed. + Promise<Nothing> promise; Future<Nothing> _run; EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run), @@ -4133,7 +4137,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) SaveArg<3>(&taskGroup), SaveArg<4>(&resourceVersionUuids), SaveArg<5>(&launchExecutor), - Return(Nothing()))); + Return(promise.future()))); driver.launchTasks(offers.get()[0].id(), {task}); @@ -4160,17 +4164,18 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) AWAIT_READY(removeFramework); Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] { - slave.get()->mock()->unmocked__run( + return slave.get()->mock()->unmocked__run( frameworkInfo, executorInfo, task_, taskGroup, resourceVersionUuids, launchExecutor); - - return Nothing(); }); + // Resume the original continuation once `unmocked__run` is complete. + promise.associate(unmocked__run); + AWAIT_READY(status); EXPECT_EQ(TASK_KILLED, status->state()); @@ -4248,8 +4253,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask)); // Skip what Slave::_run() normally does, save its arguments for - // later, tie reaching the critical moment when to kill the task to - // a future. + // later, return a pending future to pause the original continuation, + // so that we can control when the task is killed. FrameworkInfo frameworkInfo1, frameworkInfo2; ExecutorInfo executorInfo1, executorInfo2; Option<TaskGroupInfo> taskGroup1, taskGroup2; @@ -4257,6 +4262,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) vector<ResourceVersionUUID> resourceVersionUuids1, resourceVersionUuids2; Option<bool> launchExecutor1, launchExecutor2; + Promise<Nothing> promise1, promise2; Future<Nothing> _run1, _run2; EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run1), @@ -4266,7 +4272,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) SaveArg<3>(&taskGroup1), SaveArg<4>(&resourceVersionUuids1), SaveArg<5>(&launchExecutor1), - Return(Nothing()))) + Return(promise1.future()))) .WillOnce(DoAll(FutureSatisfy(&_run2), SaveArg<0>(&frameworkInfo2), SaveArg<1>(&executorInfo2), @@ -4274,7 +4280,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) SaveArg<3>(&taskGroup2), SaveArg<4>(&resourceVersionUuids2), SaveArg<5>(&launchExecutor2), - Return(Nothing()))); + Return(promise2.future()))); driver.launchTasks(offers.get()[0].id(), {task1, task2}); @@ -4310,8 +4316,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) AWAIT_READY(removeFramework); // The `__run` continuations should have no effect. - process::dispatch(slave.get()->pid, [=] { - slave.get()->mock()->unmocked__run( + Future<Nothing> unmocked__run1 = process::dispatch(slave.get()->pid, [=] { + return slave.get()->mock()->unmocked__run( frameworkInfo1, executorInfo1, task_1, @@ -4320,8 +4326,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) launchExecutor1); }); - process::dispatch(slave.get()->pid, [=] { - slave.get()->mock()->unmocked__run( + Future<Nothing> unmocked__run2 = process::dispatch(slave.get()->pid, [=] { + return slave.get()->mock()->unmocked__run( frameworkInfo2, executorInfo2, task_2, @@ -4330,6 +4336,10 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) launchExecutor2); }); + // Resume the original continuation once unmocked__run is complete. + promise1.associate(unmocked__run1); + promise2.associate(unmocked__run2); + Clock::settle(); driver.stop(); @@ -7214,8 +7224,9 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) Option<bool> launchExecutor; // Skip what `Slave::_run()` normally does, save its arguments for - // later, till reaching the critical moment when to kill the task - // in the future. + // later, return a pending future to pause the original continuation, + // till reaching the critical moment when to kill the task in the future. + Promise<Nothing> promise; Future<Nothing> _run; EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run), @@ -7225,7 +7236,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) SaveArg<3>(&taskGroup_), SaveArg<4>(&resourceVersionUuids), SaveArg<5>(&launchExecutor), - Return(Nothing()))); + Return(promise.future()))); const v1::Offer& offer = offers->offers(0); const SlaveID slaveId = devolve(offer.agent_id()); @@ -7286,17 +7297,18 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) AWAIT_READY(removeFramework); Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] { - slave.get()->mock()->unmocked__run( + return slave.get()->mock()->unmocked__run( frameworkInfo, executorInfo_, task_, taskGroup_, resourceVersionUuids, launchExecutor); - - return Nothing(); }); + // Resume the original continuation once `unmocked__run` is complete. + promise.associate(unmocked__run); + AWAIT_READY(update1); AWAIT_READY(update2);