Made the command executor use the unversioned protobufs internally. Currently, the existing command executor was in the `v1` namespace and used the v1 protobufs. This change addresses that and moves the code to the `mesos::internal` namespace.
Review: https://reviews.apache.org/r/50411/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/64842e4c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/64842e4c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/64842e4c Branch: refs/heads/master Commit: 64842e4cdc24900f586f87bb645b6d6d88da4804 Parents: 7070d1e Author: Anand Mazumdar <an...@apache.org> Authored: Wed Aug 10 11:04:02 2016 -0700 Committer: Anand Mazumdar <an...@apache.org> Committed: Wed Aug 10 11:04:02 2016 -0700 ---------------------------------------------------------------------- src/launcher/executor.cpp | 180 +++++++++++++++++------------------ src/launcher/posix/executor.cpp | 9 +- src/launcher/posix/executor.hpp | 6 +- 3 files changed, 91 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp index 52d20af..0e23d4f 100644 --- a/src/launcher/executor.cpp +++ b/src/launcher/executor.cpp @@ -28,9 +28,6 @@ #include <mesos/mesos.hpp> -#include <mesos/v1/executor.hpp> -#include <mesos/v1/mesos.hpp> - #include <mesos/type_utils.hpp> #include <process/clock.hpp> @@ -110,25 +107,14 @@ using process::Subprocess; using process::Time; using process::Timer; -using mesos::internal::devolve; -using mesos::internal::evolve; -using mesos::internal::HealthChecker; -using mesos::internal::TaskHealthStatus; - -using mesos::internal::protobuf::frameworkHasCapability; +using mesos::executor::Call; +using mesos::executor::Event; -using mesos::v1::ExecutorID; -using mesos::v1::FrameworkID; - -using mesos::v1::executor::Call; -using mesos::v1::executor::Event; using mesos::v1::executor::Mesos; using mesos::v1::executor::MesosBase; using mesos::v1::executor::V0ToV1Adapter; - namespace mesos { -namespace v1 { namespace internal { class CommandExecutor: public ProtobufProcess<CommandExecutor> @@ -190,65 +176,60 @@ public: state = DISCONNECTED; } - void received(queue<Event> events) + void received(const Event& event) { - while (!events.empty()) { - Event event = events.front(); - events.pop(); - - cout << "Received " << event.type() << " event" << endl; - - switch (event.type()) { - case Event::SUBSCRIBED: { - cout << "Subscribed executor on " - << event.subscribed().agent_info().hostname() << endl; - - frameworkInfo = event.subscribed().framework_info(); - state = SUBSCRIBED; - break; - } - - case Event::LAUNCH: { - launch(event.launch().task()); - break; - } - - case Event::KILL: { - Option<KillPolicy> override = event.kill().has_kill_policy() - ? Option<KillPolicy>(event.kill().kill_policy()) - : None(); - - kill(event.kill().task_id(), override); - break; - } - - case Event::ACKNOWLEDGED: { - // Remove the corresponding update. - updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get()); - - // Remove the corresponding task. - task = None(); - break; - } - - case Event::SHUTDOWN: { - shutdown(); - break; - } - - case Event::MESSAGE: { - break; - } - - case Event::ERROR: { - cerr << "Error: " << event.error().message() << endl; - break; - } - - case Event::UNKNOWN: { - LOG(WARNING) << "Received an UNKNOWN event and ignored"; - break; - } + cout << "Received " << event.type() << " event" << endl; + + switch (event.type()) { + case Event::SUBSCRIBED: { + cout << "Subscribed executor on " + << event.subscribed().slave_info().hostname() << endl; + + frameworkInfo = event.subscribed().framework_info(); + state = SUBSCRIBED; + break; + } + + case Event::LAUNCH: { + launch(event.launch().task()); + break; + } + + case Event::KILL: { + Option<KillPolicy> override = event.kill().has_kill_policy() + ? Option<KillPolicy>(event.kill().kill_policy()) + : None(); + + kill(event.kill().task_id(), override); + break; + } + + case Event::ACKNOWLEDGED: { + // Remove the corresponding update. + updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get()); + + // Remove the corresponding task. + task = None(); + break; + } + + case Event::SHUTDOWN: { + shutdown(); + break; + } + + case Event::MESSAGE: { + break; + } + + case Event::ERROR: { + cerr << "Error: " << event.error().message() << endl; + break; + } + + case Event::UNKNOWN: { + LOG(WARNING) << "Received an UNKNOWN event and ignored"; + break; } } } @@ -271,31 +252,45 @@ protected: // after the process has spawned. if (value.isSome() && value.get() == "1") { mesos.reset(new Mesos( - mesos::ContentType::PROTOBUF, + ContentType::PROTOBUF, defer(self(), &Self::connected), defer(self(), &Self::disconnected), - defer(self(), &Self::received, lambda::_1))); + defer(self(), [this](queue<v1::executor::Event> events) { + while(!events.empty()) { + const v1::executor::Event& event = events.front(); + received(devolve(event)); + + events.pop(); + } + }))); } else { mesos.reset(new V0ToV1Adapter( defer(self(), &Self::connected), defer(self(), &Self::disconnected), - defer(self(), &Self::received, lambda::_1))); + defer(self(), [this](queue<v1::executor::Event> events) { + while(!events.empty()) { + const v1::executor::Event& event = events.front(); + received(devolve(event)); + + events.pop(); + } + }))); } } void taskHealthUpdated( - const mesos::TaskID& taskID, + const TaskID& taskID, const bool healthy, const bool initiateTaskKill) { cout << "Received task health update, healthy: " << stringify(healthy) << endl; - update(evolve(taskID), TASK_RUNNING, healthy); + update(taskID, TASK_RUNNING, healthy); if (initiateTaskKill) { killedByHealthCheck = true; - kill(evolve(taskID)); + kill(taskID); } } @@ -323,7 +318,7 @@ protected: subscribe->add_unacknowledged_tasks()->MergeFrom(task.get()); } - mesos->send(call); + mesos->send(evolve(call)); delay(Seconds(1), self(), &Self::doReliableRegistration); } @@ -362,7 +357,7 @@ protected: ABORT("Failed to parse JSON: " + object.error()); } - Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get()); + Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(object.get()); if (parse.isError()) { ABORT("Failed to parse protobuf: " + parse.error()); } @@ -418,9 +413,9 @@ protected: if (task->has_health_check()) { Try<Owned<HealthChecker>> _checker = HealthChecker::create( - devolve(task->health_check()), + task->health_check(), self(), - devolve(task->task_id())); + task->task_id()); if (_checker.isError()) { // TODO(gilbert): Consider ABORT and return a TASK_FAILED here. @@ -559,9 +554,9 @@ private: CHECK_SOME(taskId); CHECK(taskId.get() == _taskId); - if (frameworkHasCapability( - devolve(frameworkInfo.get()), - mesos::FrameworkInfo::Capability::TASK_KILLING_STATE)) { + if (protobuf::frameworkHasCapability( + frameworkInfo.get(), + FrameworkInfo::Capability::TASK_KILLING_STATE)) { update(taskId.get(), TASK_KILLING); } @@ -714,7 +709,7 @@ private: // Capture the status update. updates[uuid] = call.update(); - mesos->send(call); + mesos->send(evolve(call)); } enum State @@ -757,7 +752,6 @@ private: }; } // namespace internal { -} // namespace v1 { } // namespace mesos { @@ -811,8 +805,8 @@ public: int main(int argc, char** argv) { Flags flags; - FrameworkID frameworkId; - ExecutorID executorId; + mesos::FrameworkID frameworkId; + mesos::ExecutorID executorId; #ifdef __WINDOWS__ process::Winsock winsock; @@ -869,8 +863,8 @@ int main(int argc, char** argv) shutdownGracePeriod = parse.get(); } - Owned<mesos::v1::internal::CommandExecutor> executor( - new mesos::v1::internal::CommandExecutor( + Owned<mesos::internal::CommandExecutor> executor( + new mesos::internal::CommandExecutor( flags.launcher_dir, flags.rootfs, flags.sandbox_directory, http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/posix/executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/posix/executor.cpp b/src/launcher/posix/executor.cpp index 6814b9f..43573ca 100644 --- a/src/launcher/posix/executor.cpp +++ b/src/launcher/posix/executor.cpp @@ -24,8 +24,6 @@ #include <stout/os/raw/argv.hpp> -#include "internal/devolve.hpp" - #include "launcher/posix/executor.hpp" #ifdef __linux__ @@ -48,16 +46,14 @@ using std::endl; using std::string; using std::vector; -using mesos::internal::devolve; using mesos::internal::slave::MESOS_CONTAINERIZER; using mesos::internal::slave::MesosContainerizerLaunch; namespace mesos { -namespace v1 { namespace internal { pid_t launchTaskPosix( - const mesos::v1::CommandInfo& command, + const CommandInfo& command, const string& launcherDir, const Option<string>& user, const Option<string>& rootfs, @@ -85,7 +81,7 @@ pid_t launchTaskPosix( // Prepare the flags to pass to the launch process. MesosContainerizerLaunch::Flags launchFlags; - launchFlags.command = JSON::protobuf(devolve(command)); + launchFlags.command = JSON::protobuf(command); if (rootfs.isSome()) { CHECK_SOME(sandboxDirectory); @@ -127,5 +123,4 @@ pid_t launchTaskPosix( } } // namespace internal { -} // namespace v1 { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/64842e4c/src/launcher/posix/executor.hpp ---------------------------------------------------------------------- diff --git a/src/launcher/posix/executor.hpp b/src/launcher/posix/executor.hpp index a508089..9e46726 100644 --- a/src/launcher/posix/executor.hpp +++ b/src/launcher/posix/executor.hpp @@ -19,16 +19,15 @@ #include <string> -#include <mesos/v1/mesos.hpp> +#include <mesos/mesos.hpp> #include <stout/option.hpp> namespace mesos { -namespace v1 { namespace internal { pid_t launchTaskPosix( - const mesos::v1::CommandInfo& command, + const CommandInfo& command, const std::string& launcherDir, const Option<std::string>& user, const Option<std::string>& rootfs, @@ -36,7 +35,6 @@ pid_t launchTaskPosix( const Option<std::string>& workingDirectory); } // namespace internal { -} // namespace v1 { } // namespace mesos { #endif // __LAUNCHER_POSIX_EXECUTOR_HPP__