Updated users of ProtobufProcess::from. Review: https://reviews.apache.org/r/14901
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1657a845 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1657a845 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1657a845 Branch: refs/heads/master Commit: 1657a84569e8455e7c39b6529a70a248dbab08ac Parents: f6793de Author: Benjamin Mahler <bmah...@twitter.com> Authored: Wed Oct 23 20:29:00 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Thu Oct 24 14:31:07 2013 -0700 ---------------------------------------------------------------------- src/exec/exec.cpp | 2 +- src/master/master.cpp | 93 +++++++++++++-------------------- src/master/master.hpp | 110 ++++++++++++++++++++++++---------------- src/sasl/authenticatee.hpp | 4 +- src/sched/sched.cpp | 4 +- src/slave/slave.cpp | 16 +++--- src/slave/slave.hpp | 12 +++-- src/tests/cluster.hpp | 2 +- 8 files changed, 127 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/exec/exec.cpp ---------------------------------------------------------------------- diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index 7ca21fa..4a598f5 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -233,7 +233,7 @@ protected: VLOG(1) << "Executor::reregistered took " << stopwatch.elapsed(); } - void reconnect(const SlaveID& slaveId) + void reconnect(const UPID& from, const SlaveID& slaveId) { if (aborted) { VLOG(1) << "Ignoring reconnect message from slave " << slaveId http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index f838a9d..1147cc6 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -714,37 +714,24 @@ void Master::noMasterDetected() } -void Master::registerFramework(const FrameworkInfo& frameworkInfo) +void Master::registerFramework( + const UPID& from, + const FrameworkInfo& frameworkInfo) { if (authenticating.contains(from)) { LOG(INFO) << "Queuing up registration request from " << from << " because authentication is still in progress"; authenticating[from] - .onReady(defer(self(), &Self::_registerFramework, frameworkInfo, from)); - } else { - _registerFramework(frameworkInfo, from); + .onReady(defer(self(), &Self::registerFramework, from, frameworkInfo)); + return; } -} - -void Master::_registerFramework( - const FrameworkInfo& frameworkInfo, - const UPID& from) -{ if (!elected) { LOG(WARNING) << "Ignoring register framework message since not elected yet"; return; } - if (authenticating.contains(from)) { - // This could happen if another authentication request came - // through before we are here. - LOG(WARNING) << "Ignoring registration request from " << from - << " because authentication is back in progress"; - return; - } - if (flags.authenticate && !authenticated.contains(from)) { // This could happen if another authentication request came // through before we are here or if a framework tried to register @@ -754,14 +741,14 @@ void Master::_registerFramework( FrameworkErrorMessage message; message.set_message("Framework at " + stringify(from) + " is not authenticated."); - reply(message); + send(from, message); return; } if (!roles.contains(frameworkInfo.role())) { FrameworkErrorMessage message; message.set_message("Role '" + frameworkInfo.role() + "' is not valid."); - reply(message); + send(from, message); return; } @@ -775,7 +762,7 @@ void Master::_registerFramework( FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id); message.mutable_master_info()->MergeFrom(info); - reply(message); + send(from, message); return; } } @@ -792,7 +779,7 @@ void Master::_registerFramework( << "root submissions are disabled on this cluster"; FrameworkErrorMessage message; message.set_message("User 'root' is not allowed to run frameworks"); - reply(message); + send(from, message); delete framework; return; } @@ -802,6 +789,7 @@ void Master::_registerFramework( void Master::reregisterFramework( + const UPID& from, const FrameworkInfo& frameworkInfo, bool failover) { @@ -811,21 +799,13 @@ void Master::reregisterFramework( authenticating[from] .onReady(defer(self(), - &Self::_reregisterFramework, + &Self::reregisterFramework, + from, frameworkInfo, - failover, - from)); - } else { - _reregisterFramework(frameworkInfo, failover, from); + failover)); + return; } -} - -void Master::_reregisterFramework( - const FrameworkInfo& frameworkInfo, - bool failover, - const UPID& from) -{ if (!elected) { LOG(WARNING) << "Ignoring re-register framework message since " << "not elected yet"; @@ -836,16 +816,7 @@ void Master::_reregisterFramework( LOG(ERROR) << "Framework re-registering without an id!"; FrameworkErrorMessage message; message.set_message("Framework reregistered without a framework id"); - reply(message); - return; - } - - if (authenticating.contains(from)) { - // This could happen if another authentication request came - // through before we are here. - LOG(WARNING) - << "Ignoring re-registration request for framework " << frameworkInfo.id() - << " from " << from << " because authentication is back in progress"; + send(from, message); return; } @@ -858,14 +829,14 @@ void Master::_reregisterFramework( FrameworkErrorMessage message; message.set_message("Framework '" + frameworkInfo.id().value() + "' at " + stringify(from) + " is not authenticated."); - reply(message); + send(from, message); return; } if (!roles.contains(frameworkInfo.role())) { FrameworkErrorMessage message; message.set_message("Role '" + frameworkInfo.role() + "' is not valid."); - reply(message); + send(from, message); return; } @@ -915,7 +886,7 @@ void Master::_reregisterFramework( FrameworkReregisteredMessage message; message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); message.mutable_master_info()->MergeFrom(info); - reply(message); + send(from, message); return; } } else { @@ -975,7 +946,9 @@ void Master::_reregisterFramework( } -void Master::unregisterFramework(const FrameworkID& frameworkId) +void Master::unregisterFramework( + const UPID& from, + const FrameworkID& frameworkId) { LOG(INFO) << "Asked to unregister framework " << frameworkId; @@ -991,7 +964,9 @@ void Master::unregisterFramework(const FrameworkID& frameworkId) } -void Master::deactivateFramework(const FrameworkID& frameworkId) +void Master::deactivateFramework( + const UPID& from, + const FrameworkID& frameworkId) { Framework* framework = getFramework(frameworkId); @@ -1211,7 +1186,7 @@ void Master::schedulerMessage(const SlaveID& slaveId, } -void Master::registerSlave(const SlaveInfo& slaveInfo) +void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo) { if (!elected) { LOG(WARNING) << "Ignoring register slave message from " @@ -1266,10 +1241,12 @@ void Master::registerSlave(const SlaveInfo& slaveInfo) } -void Master::reregisterSlave(const SlaveID& slaveId, - const SlaveInfo& slaveInfo, - const vector<ExecutorInfo>& executorInfos, - const vector<Task>& tasks) +void Master::reregisterSlave( + const UPID& from, + const SlaveID& slaveId, + const SlaveInfo& slaveInfo, + const vector<ExecutorInfo>& executorInfos, + const vector<Task>& tasks) { if (!elected) { LOG(WARNING) << "Ignoring re-register slave message from " @@ -1460,6 +1437,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) void Master::exitedExecutor( + const UPID& from, const SlaveID& slaveId, const FrameworkID& frameworkId, const ExecutorID& executorId, @@ -1488,7 +1466,7 @@ void Master::exitedExecutor( CHECK(!deactivatedSlaves.contains(from)) << "Received exited message for executor " << executorId << " from " << from - << " which is deactivated slave " << slaveId + << " which is the deactivated slave " << slaveId << "(" << slave->info.hostname() << ")"; // Tell the allocator about the recovered resources. @@ -1545,6 +1523,7 @@ void Master::deactivateSlave(const SlaveID& slaveId) void Master::reconcileTasks( + const UPID& from, const FrameworkID& frameworkId, const std::vector<TaskStatus>& statuses) { @@ -1693,7 +1672,7 @@ void Master::offer(const FrameworkID& frameworkId, } -void Master::authenticate(const UPID& pid) +void Master::authenticate(const UPID& from, const UPID& pid) { // Deactivate the framework if it's already registered. foreachvalue (Framework* framework, frameworks) { @@ -1715,7 +1694,7 @@ void Master::authenticate(const UPID& pid) // Retry after the current authenticator finishes. authenticating[pid] - .onAny(defer(self(), &Self::authenticate, pid)); + .onAny(defer(self(), &Self::authenticate, from, pid)); return; } http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 9f5e25b..1eba03f 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -89,67 +89,91 @@ public: virtual ~Master(); - void submitScheduler(const std::string& name); - void newMasterDetected(const UPID& pid); + void submitScheduler( + const std::string& name); + void newMasterDetected( + const UPID& pid); void noMasterDetected(); void masterDetectionFailure(); - void registerFramework(const FrameworkInfo& frameworkInfo); - void reregisterFramework(const FrameworkInfo& frameworkInfo, - bool failover); - void unregisterFramework(const FrameworkID& frameworkId); - void deactivateFramework(const FrameworkID& frameworkId); - void resourceRequest(const FrameworkID& frameworkId, - const std::vector<Request>& requests); - void launchTasks(const FrameworkID& frameworkId, - const OfferID& offerId, - const std::vector<TaskInfo>& tasks, - const Filters& filters); - void reviveOffers(const FrameworkID& frameworkId); - void killTask(const FrameworkID& frameworkId, const TaskID& taskId); - void schedulerMessage(const SlaveID& slaveId, - const FrameworkID& frameworkId, - const ExecutorID& executorId, - const std::string& data); - void registerSlave(const SlaveInfo& slaveInfo); - void reregisterSlave(const SlaveID& slaveId, - const SlaveInfo& slaveInfo, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks); - void unregisterSlave(const SlaveID& slaveId); - void statusUpdate(const StatusUpdate& update, const UPID& pid); - void exitedExecutor(const SlaveID& slaveId, - const FrameworkID& frameworkId, - const ExecutorID& executorId, - int32_t status); - void deactivateSlave(const SlaveID& slaveId); + void registerFramework( + const process::UPID& from, + const FrameworkInfo& frameworkInfo); + void reregisterFramework( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + bool failover); + void unregisterFramework( + const process::UPID& from, + const FrameworkID& frameworkId); + void deactivateFramework( + const process::UPID& from, + const FrameworkID& frameworkId); + void resourceRequest( + const FrameworkID& frameworkId, + const std::vector<Request>& requests); + void launchTasks( + const FrameworkID& frameworkId, + const OfferID& offerId, + const std::vector<TaskInfo>& tasks, + const Filters& filters); + void reviveOffers( + const FrameworkID& frameworkId); + void killTask( + const FrameworkID& frameworkId, + const TaskID& taskId); + void schedulerMessage( + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const std::string& data); + void registerSlave( + const process::UPID& from, + const SlaveInfo& slaveInfo); + void reregisterSlave( + const process::UPID& from, + const SlaveID& slaveId, + const SlaveInfo& slaveInfo, + const std::vector<ExecutorInfo>& executorInfos, + const std::vector<Task>& tasks); + void unregisterSlave( + const SlaveID& slaveId); + void statusUpdate( + const StatusUpdate& update, + const UPID& pid); + void exitedExecutor( + const process::UPID& from, + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + int32_t status); + void deactivateSlave( + const SlaveID& slaveId); // TODO(bmahler): It would be preferred to use a unique libprocess // Process identifier (PID is not sufficient) for identifying the // framework instance, rather than relying on re-registration time. - void frameworkFailoverTimeout(const FrameworkID& frameworkId, - const Time& reregisteredTime); + void frameworkFailoverTimeout( + const FrameworkID& frameworkId, + const Time& reregisteredTime); - void offer(const FrameworkID& framework, - const hashmap<SlaveID, Resources>& resources); + void offer( + const FrameworkID& framework, + const hashmap<SlaveID, Resources>& resources); void reconcileTasks( + const process::UPID& from, const FrameworkID& frameworkId, const std::vector<TaskStatus>& statuses); - void authenticate(const UPID& pid); + void authenticate( + const process::UPID& from, + const process::UPID& pid); protected: virtual void initialize(); virtual void finalize(); virtual void exited(const UPID& pid); - void _registerFramework(const FrameworkInfo& frameworkInfo, const UPID& pid); - - void _reregisterFramework( - const FrameworkInfo& frameworkInfo, - bool failover, - const UPID& pid); - void deactivate(Framework* framework); // 'promise' is used to signal finish of authentication. http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/sasl/authenticatee.hpp ---------------------------------------------------------------------- diff --git a/src/sasl/authenticatee.hpp b/src/sasl/authenticatee.hpp index 09ef018..f1a677f 100644 --- a/src/sasl/authenticatee.hpp +++ b/src/sasl/authenticatee.hpp @@ -242,7 +242,7 @@ protected: message.set_mechanism(mechanism); message.set_data(output, length); - send(from, message); + reply(message); status = STEPPING; } @@ -279,7 +279,7 @@ protected: if (output != NULL && length > 0) { message.set_data(output, length); } - send(from, message); + reply(message); } else { status = ERROR; std::string error(sasl_errdetail(connection)); http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 824b4b7..3049096 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -554,7 +554,7 @@ protected: send(pid, message); } - void lostSlave(const SlaveID& slaveId) + void lostSlave(const UPID& from, const SlaveID& slaveId) { if (aborted) { VLOG(1) << "Ignoring lost slave message because the driver is aborted!"; @@ -563,7 +563,7 @@ protected: if (from != master) { LOG(WARNING) << "Ignoring lost slave message from " << from - << "because it is not from the registered master (" + << " because it is not from the registered master (" << master << ")"; return; } http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 5272600..243a830 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -402,7 +402,7 @@ void Slave::finalize() // checkpointing. This is because slave recovery tests terminate // the slave to simulate slave restart. if (!frameworks[frameworkId]->info.checkpoint()) { - shutdownFramework(frameworkId); + shutdownFramework(UPID(), frameworkId); } } @@ -421,7 +421,7 @@ void Slave::finalize() } -void Slave::shutdown() +void Slave::shutdown(const UPID& from) { // Allow shutdown message only if // 1) Its a message received from the registered master or @@ -451,7 +451,7 @@ void Slave::shutdown() // NOTE: We use 'frameworks.keys()' here because 'shutdownFramework' // can potentially remove a framework from 'frameworks'. foreach (const FrameworkID& frameworkId, frameworks.keys()) { - shutdownFramework(frameworkId); + shutdownFramework(from, frameworkId); } } } @@ -534,7 +534,7 @@ void Slave::noMasterDetected() } -void Slave::registered(const SlaveID& slaveId) +void Slave::registered(const UPID& from, const SlaveID& slaveId) { if (from != master) { LOG(WARNING) << "Ignoring registration message from " << from @@ -581,7 +581,7 @@ void Slave::registered(const SlaveID& slaveId) } -void Slave::reregistered(const SlaveID& slaveId) +void Slave::reregistered(const UPID& from, const SlaveID& slaveId) { if (from != master) { LOG(WARNING) << "Ignoring re-registration message from " << from @@ -1102,7 +1102,9 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId) // sending back a shut down acknowledgement, because otherwise you // could get into a state where a shut down was sent, dropped, and // therefore never processed. -void Slave::shutdownFramework(const FrameworkID& frameworkId) +void Slave::shutdownFramework( + const UPID& from, + const FrameworkID& frameworkId) { // Allow shutdownFramework() only if // its called directly (e.g. Slave::finalize()) or @@ -1394,6 +1396,7 @@ void Slave::_statusUpdateAcknowledgement( void Slave::registerExecutor( + const UPID& from, const FrameworkID& frameworkId, const ExecutorID& executorId) { @@ -1545,6 +1548,7 @@ void Slave::registerExecutor( void Slave::reregisterExecutor( + const UPID& from, const FrameworkID& frameworkId, const ExecutorID& executorId, const vector<TaskInfo>& tasks, http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 5211b46..b9f9b04 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -82,13 +82,13 @@ public: virtual ~Slave(); - void shutdown(); + void shutdown(const process::UPID& from); void newMasterDetected(const UPID& pid); void noMasterDetected(); void masterDetectionFailure(); - void registered(const SlaveID& slaveId); - void reregistered(const SlaveID& slaveId); + void registered(const process::UPID& from, const SlaveID& slaveId); + void reregistered(const process::UPID& from, const SlaveID& slaveId); void doReliableRegistration(); void runTask( @@ -108,7 +108,9 @@ public: void killTask(const FrameworkID& frameworkId, const TaskID& taskId); - void shutdownFramework(const FrameworkID& frameworkId); + void shutdownFramework( + const process::UPID& from, + const FrameworkID& frameworkId); void schedulerMessage( const SlaveID& slaveId, @@ -119,6 +121,7 @@ public: void updateFramework(const FrameworkID& frameworkId, const std::string& pid); void registerExecutor( + const process::UPID& from, const FrameworkID& frameworkId, const ExecutorID& executorId); @@ -127,6 +130,7 @@ public: // driver never received an ACK for.) // 'updates' : Unacknowledged updates. void reregisterExecutor( + const process::UPID& from, const FrameworkID& frameworkId, const ExecutorID& executorId, const std::vector<TaskInfo>& tasks, http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/tests/cluster.hpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index f743bb3..5621139 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -379,7 +379,7 @@ inline Try<Nothing> Cluster::Slaves::stop( Slave slave = slaves[pid]; if (shutdown) { - process::dispatch(slave.slave, &slave::Slave::shutdown); + process::dispatch(slave.slave, &slave::Slave::shutdown, process::UPID()); } else { process::terminate(slave.slave); }