Made master set `launch_executor` in the RunTask(Group)Message. By setting a new field `launch_executor` in the RunTask(Group)Message, the master is able to control executor creation on the agent.
Also refactored the `addTask()` logic. Added two new functions: `isTaskLaunchExecutor()` checks if a task needs to launch an executor; `addExecutor()` adds an executor to the framework and slave. Review: https://reviews.apache.org/r/65504/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ce7f1f6a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ce7f1f6a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ce7f1f6a Branch: refs/heads/master Commit: ce7f1f6a0807b96b92cb4c755c52f36e1a8e2853 Parents: 7c29031 Author: Meng Zhu <m...@mesosphere.io> Authored: Tue Feb 13 22:44:58 2018 -0800 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Wed Feb 14 02:29:38 2018 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 112 +++++++++++++++++++++++++++++++-------------- src/master/master.hpp | 19 ++++++-- 2 files changed, 92 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ce7f1f6a/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d7d2286..b06d7a6 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3830,44 +3830,56 @@ Future<bool> Master::authorizeSlave( } -Resources Master::addTask( - const TaskInfo& task, +bool Master::isLaunchExecutor( + const ExecutorID& executorId, Framework* framework, - Slave* slave) + Slave* slave) const { CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - CHECK(slave->connected) << "Adding task " << task.task_id() - << " to disconnected agent " << *slave; - // The resources consumed. - Resources resources = task.resources(); + if (!slave->hasExecutor(framework->id(), executorId)) { + CHECK(!framework->hasExecutor(slave->id, executorId)) + << "Executor '" << executorId + << "' known to the framework " << *framework + << " but unknown to the agent " << *slave; + return true; + } + + return false; +} + - // Determine if this task launches an executor, and if so make sure - // the slave and framework state has been updated accordingly. +void Master::addExecutor( + const ExecutorInfo& executorInfo, + Framework* framework, + Slave* slave) +{ + CHECK_NOTNULL(framework); + CHECK_NOTNULL(slave); + CHECK(slave->connected) << "Adding executor " << executorInfo.executor_id() + << " to disconnected agent " << *slave; - if (task.has_executor()) { - // TODO(benh): Refactor this code into Slave::addTask. - if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) { - CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id())) - << "Executor '" << task.executor().executor_id() - << "' known to the framework " << *framework - << " but unknown to the agent " << *slave; + slave->addExecutor(framework->id(), executorInfo); + framework->addExecutor(slave->id, executorInfo); +} - slave->addExecutor(framework->id(), task.executor()); - framework->addExecutor(slave->id, task.executor()); - resources += task.executor().resources(); - } - } +void Master::addTask( + const TaskInfo& task, + Framework* framework, + Slave* slave) +{ + CHECK_NOTNULL(framework); + CHECK_NOTNULL(slave); + CHECK(slave->connected) << "Adding task " << task.task_id() + << " to disconnected agent " << *slave; // Add the task to the framework and slave. Task* t = new Task(protobuf::createTask(task, TASK_STAGING, framework->id())); slave->addTask(t); framework->addTask(t); - - return resources; } @@ -4900,7 +4912,23 @@ void Master::_accept( // Add task. if (pending) { - const Resources consumed = addTask(task, framework, slave); + Resources consumed; + + bool launchExecutor = true; + if (task.has_executor()) { + launchExecutor = isLaunchExecutor( + task.executor().executor_id(), framework, slave); + + // Master tracks the new executor only if the task is not a + // command task. + if (launchExecutor) { + addExecutor(task.executor(), framework, slave); + consumed += task.executor().resources(); + } + } + + addTask(task, framework, slave); + consumed += task.resources(); CHECK(available.contains(consumed)) << available << " does not contain " << consumed; @@ -4942,6 +4970,8 @@ void Master::_accept( message.set_pid(framework->pid.getOrElse(UPID())); message.mutable_task()->MergeFrom(task); + message.set_launch_executor(launchExecutor); + if (HookManager::hooksAvailable()) { // Set labels retrieved from label-decorator hooks. message.mutable_task()->mutable_labels()->CopyFrom( @@ -4960,11 +4990,11 @@ void Master::_accept( CHECK_SOME(downgradeResources(&message)); } - // TODO(bmahler): Consider updating this log message to - // indicate when the executor is also being launched. LOG(INFO) << "Launching task " << task.task_id() << " of framework " << *framework << " with resources " << task.resources() - << " on agent " << *slave; + << " on agent " << *slave << " on " + << (launchExecutor ? + " new executor" : " existing executor"); send(slave->pid, message); } @@ -5123,18 +5153,25 @@ void Master::_accept( set<TaskID> taskIds; Resources totalResources; + Resources executorResources; + + bool launchExecutor = + isLaunchExecutor(executor.executor_id(), framework, slave); + + if (launchExecutor) { + addExecutor(executor, framework, slave); + executorResources = executor.resources(); + totalResources += executorResources; + } + + message.set_launch_executor(launchExecutor); foreach ( TaskInfo& task, *message.mutable_task_group()->mutable_tasks()) { taskIds.insert(task.task_id()); totalResources += task.resources(); - const Resources consumed = addTask(task, framework, slave); - - CHECK(_offeredResources.contains(consumed)) - << _offeredResources << " does not contain " << consumed; - - _offeredResources -= consumed; + addTask(task, framework, slave); if (HookManager::hooksAvailable()) { // Set labels retrieved from label-decorator hooks. @@ -5146,6 +5183,11 @@ void Master::_accept( } } + CHECK(_offeredResources.contains(totalResources)) + << _offeredResources << " does not contain " << totalResources; + + _offeredResources -= totalResources; + // If the agent does not support reservation refinement, downgrade // the task and executor resources to the "pre-reservation-refinement" // format. This cannot contain any refined reservations since @@ -5157,7 +5199,9 @@ void Master::_accept( LOG(INFO) << "Launching task group " << stringify(taskIds) << " of framework " << *framework << " with resources " - << totalResources << " on agent " << *slave; + << totalResources - executorResources << " on agent " + << *slave << " on " + << (launchExecutor ? " new executor" : " existing executor"); send(slave->pid, message); http://git-wip-us.apache.org/repos/asf/mesos/blob/ce7f1f6a/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index b434d23..c4d3c80 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -859,11 +859,20 @@ protected: const Offer::Operation::Destroy& destroy, const Option<process::http::authentication::Principal>& principal); - // Add the task and its executor (if not already running) to the - // framework and slave. Returns the resources consumed as a result, - // which includes resources for the task and its executor - // (if not already running). - Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); + // Determine if a new executor needs to be launched. + bool isLaunchExecutor ( + const ExecutorID& executorId, + Framework* framework, + Slave* slave) const; + + // Add executor to the framework and slave. + void addExecutor( + const ExecutorInfo& executorInfo, + Framework* framework, + Slave* slave); + + // Add task to the framework and slave. + void addTask(const TaskInfo& task, Framework* framework, Slave* slave); // Transitions the task, and recovers resources if the task becomes // terminal.