Updated users of ProtobufProcess::from.

Review: https://reviews.apache.org/r/14901


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1657a845
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1657a845
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1657a845

Branch: refs/heads/master
Commit: 1657a84569e8455e7c39b6529a70a248dbab08ac
Parents: f6793de
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Wed Oct 23 20:29:00 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Thu Oct 24 14:31:07 2013 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp          |   2 +-
 src/master/master.cpp      |  93 +++++++++++++--------------------
 src/master/master.hpp      | 110 ++++++++++++++++++++++++----------------
 src/sasl/authenticatee.hpp |   4 +-
 src/sched/sched.cpp        |   4 +-
 src/slave/slave.cpp        |  16 +++---
 src/slave/slave.hpp        |  12 +++--
 src/tests/cluster.hpp      |   2 +-
 8 files changed, 127 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 7ca21fa..4a598f5 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -233,7 +233,7 @@ protected:
     VLOG(1) << "Executor::reregistered took " << stopwatch.elapsed();
   }
 
-  void reconnect(const SlaveID& slaveId)
+  void reconnect(const UPID& from, const SlaveID& slaveId)
   {
     if (aborted) {
       VLOG(1) << "Ignoring reconnect message from slave " << slaveId

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f838a9d..1147cc6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -714,37 +714,24 @@ void Master::noMasterDetected()
 }
 
 
-void Master::registerFramework(const FrameworkInfo& frameworkInfo)
+void Master::registerFramework(
+    const UPID& from,
+    const FrameworkInfo& frameworkInfo)
 {
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up registration request from " << from
               << " because authentication is still in progress";
 
     authenticating[from]
-      .onReady(defer(self(), &Self::_registerFramework, frameworkInfo, from));
-  } else {
-    _registerFramework(frameworkInfo, from);
+      .onReady(defer(self(), &Self::registerFramework, from, frameworkInfo));
+    return;
   }
-}
-
 
-void Master::_registerFramework(
-    const FrameworkInfo& frameworkInfo,
-    const UPID& from)
-{
   if (!elected) {
     LOG(WARNING) << "Ignoring register framework message since not elected 
yet";
     return;
   }
 
-  if (authenticating.contains(from)) {
-    // This could happen if another authentication request came
-    // through before we are here.
-    LOG(WARNING) << "Ignoring registration request from " << from
-                 << " because authentication is back in progress";
-    return;
-  }
-
   if (flags.authenticate && !authenticated.contains(from)) {
     // This could happen if another authentication request came
     // through before we are here or if a framework tried to register
@@ -754,14 +741,14 @@ void Master::_registerFramework(
     FrameworkErrorMessage message;
     message.set_message("Framework at " + stringify(from) +
                         " is not authenticated.");
-    reply(message);
+    send(from, message);
     return;
   }
 
   if (!roles.contains(frameworkInfo.role())) {
     FrameworkErrorMessage message;
     message.set_message("Role '" + frameworkInfo.role() + "' is not valid.");
-    reply(message);
+    send(from, message);
     return;
   }
 
@@ -775,7 +762,7 @@ void Master::_registerFramework(
       FrameworkRegisteredMessage message;
       message.mutable_framework_id()->MergeFrom(framework->id);
       message.mutable_master_info()->MergeFrom(info);
-      reply(message);
+      send(from, message);
       return;
     }
   }
@@ -792,7 +779,7 @@ void Master::_registerFramework(
               << "root submissions are disabled on this cluster";
     FrameworkErrorMessage message;
     message.set_message("User 'root' is not allowed to run frameworks");
-    reply(message);
+    send(from, message);
     delete framework;
     return;
   }
@@ -802,6 +789,7 @@ void Master::_registerFramework(
 
 
 void Master::reregisterFramework(
+    const UPID& from,
     const FrameworkInfo& frameworkInfo,
     bool failover)
 {
@@ -811,21 +799,13 @@ void Master::reregisterFramework(
 
     authenticating[from]
       .onReady(defer(self(),
-                     &Self::_reregisterFramework,
+                     &Self::reregisterFramework,
+                     from,
                      frameworkInfo,
-                     failover,
-                     from));
-  } else {
-    _reregisterFramework(frameworkInfo, failover, from);
+                     failover));
+    return;
   }
-}
-
 
-void Master::_reregisterFramework(
-    const FrameworkInfo& frameworkInfo,
-    bool failover,
-    const UPID& from)
-{
   if (!elected) {
     LOG(WARNING) << "Ignoring re-register framework message since "
                  << "not elected yet";
@@ -836,16 +816,7 @@ void Master::_reregisterFramework(
     LOG(ERROR) << "Framework re-registering without an id!";
     FrameworkErrorMessage message;
     message.set_message("Framework reregistered without a framework id");
-    reply(message);
-    return;
-  }
-
-  if (authenticating.contains(from)) {
-    // This could happen if another authentication request came
-    // through before we are here.
-    LOG(WARNING)
-      << "Ignoring re-registration request for framework " << 
frameworkInfo.id()
-      << " from " << from << " because authentication is back in progress";
+    send(from, message);
     return;
   }
 
@@ -858,14 +829,14 @@ void Master::_reregisterFramework(
     FrameworkErrorMessage message;
     message.set_message("Framework '" + frameworkInfo.id().value() + "' at " +
                         stringify(from) + " is not authenticated.");
-    reply(message);
+    send(from, message);
     return;
   }
 
   if (!roles.contains(frameworkInfo.role())) {
     FrameworkErrorMessage message;
     message.set_message("Role '" + frameworkInfo.role() + "' is not valid.");
-    reply(message);
+    send(from, message);
     return;
   }
 
@@ -915,7 +886,7 @@ void Master::_reregisterFramework(
       FrameworkReregisteredMessage message;
       message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
       message.mutable_master_info()->MergeFrom(info);
-      reply(message);
+      send(from, message);
       return;
     }
   } else {
@@ -975,7 +946,9 @@ void Master::_reregisterFramework(
 }
 
 
-void Master::unregisterFramework(const FrameworkID& frameworkId)
+void Master::unregisterFramework(
+    const UPID& from,
+    const FrameworkID& frameworkId)
 {
   LOG(INFO) << "Asked to unregister framework " << frameworkId;
 
@@ -991,7 +964,9 @@ void Master::unregisterFramework(const FrameworkID& 
frameworkId)
 }
 
 
-void Master::deactivateFramework(const FrameworkID& frameworkId)
+void Master::deactivateFramework(
+    const UPID& from,
+    const FrameworkID& frameworkId)
 {
   Framework* framework = getFramework(frameworkId);
 
@@ -1211,7 +1186,7 @@ void Master::schedulerMessage(const SlaveID& slaveId,
 }
 
 
-void Master::registerSlave(const SlaveInfo& slaveInfo)
+void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
 {
   if (!elected) {
     LOG(WARNING) << "Ignoring register slave message from "
@@ -1266,10 +1241,12 @@ void Master::registerSlave(const SlaveInfo& slaveInfo)
 }
 
 
-void Master::reregisterSlave(const SlaveID& slaveId,
-                             const SlaveInfo& slaveInfo,
-                             const vector<ExecutorInfo>& executorInfos,
-                             const vector<Task>& tasks)
+void Master::reregisterSlave(
+    const UPID& from,
+    const SlaveID& slaveId,
+    const SlaveInfo& slaveInfo,
+    const vector<ExecutorInfo>& executorInfos,
+    const vector<Task>& tasks)
 {
   if (!elected) {
     LOG(WARNING) << "Ignoring re-register slave message from "
@@ -1460,6 +1437,7 @@ void Master::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
 
 
 void Master::exitedExecutor(
+    const UPID& from,
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
@@ -1488,7 +1466,7 @@ void Master::exitedExecutor(
 
   CHECK(!deactivatedSlaves.contains(from))
     << "Received exited message for executor " << executorId << " from " << 
from
-    << " which is deactivated slave " << slaveId
+    << " which is the deactivated slave " << slaveId
     << "(" << slave->info.hostname() << ")";
 
   // Tell the allocator about the recovered resources.
@@ -1545,6 +1523,7 @@ void Master::deactivateSlave(const SlaveID& slaveId)
 
 
 void Master::reconcileTasks(
+    const UPID& from,
     const FrameworkID& frameworkId,
     const std::vector<TaskStatus>& statuses)
 {
@@ -1693,7 +1672,7 @@ void Master::offer(const FrameworkID& frameworkId,
 }
 
 
-void Master::authenticate(const UPID& pid)
+void Master::authenticate(const UPID& from, const UPID& pid)
 {
   // Deactivate the framework if it's already registered.
   foreachvalue (Framework* framework, frameworks) {
@@ -1715,7 +1694,7 @@ void Master::authenticate(const UPID& pid)
 
     // Retry after the current authenticator finishes.
     authenticating[pid]
-      .onAny(defer(self(), &Self::authenticate, pid));
+      .onAny(defer(self(), &Self::authenticate, from, pid));
 
     return;
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9f5e25b..1eba03f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -89,67 +89,91 @@ public:
 
   virtual ~Master();
 
-  void submitScheduler(const std::string& name);
-  void newMasterDetected(const UPID& pid);
+  void submitScheduler(
+      const std::string& name);
+  void newMasterDetected(
+      const UPID& pid);
   void noMasterDetected();
   void masterDetectionFailure();
-  void registerFramework(const FrameworkInfo& frameworkInfo);
-  void reregisterFramework(const FrameworkInfo& frameworkInfo,
-                           bool failover);
-  void unregisterFramework(const FrameworkID& frameworkId);
-  void deactivateFramework(const FrameworkID& frameworkId);
-  void resourceRequest(const FrameworkID& frameworkId,
-                       const std::vector<Request>& requests);
-  void launchTasks(const FrameworkID& frameworkId,
-                   const OfferID& offerId,
-                   const std::vector<TaskInfo>& tasks,
-                   const Filters& filters);
-  void reviveOffers(const FrameworkID& frameworkId);
-  void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
-  void schedulerMessage(const SlaveID& slaveId,
-                        const FrameworkID& frameworkId,
-                        const ExecutorID& executorId,
-                        const std::string& data);
-  void registerSlave(const SlaveInfo& slaveInfo);
-  void reregisterSlave(const SlaveID& slaveId,
-                       const SlaveInfo& slaveInfo,
-                       const std::vector<ExecutorInfo>& executorInfos,
-                       const std::vector<Task>& tasks);
-  void unregisterSlave(const SlaveID& slaveId);
-  void statusUpdate(const StatusUpdate& update, const UPID& pid);
-  void exitedExecutor(const SlaveID& slaveId,
-                      const FrameworkID& frameworkId,
-                      const ExecutorID& executorId,
-                      int32_t status);
-  void deactivateSlave(const SlaveID& slaveId);
+  void registerFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo);
+  void reregisterFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      bool failover);
+  void unregisterFramework(
+      const process::UPID& from,
+      const FrameworkID& frameworkId);
+  void deactivateFramework(
+      const process::UPID& from,
+      const FrameworkID& frameworkId);
+  void resourceRequest(
+      const FrameworkID& frameworkId,
+      const std::vector<Request>& requests);
+  void launchTasks(
+      const FrameworkID& frameworkId,
+      const OfferID& offerId,
+      const std::vector<TaskInfo>& tasks,
+      const Filters& filters);
+  void reviveOffers(
+      const FrameworkID& frameworkId);
+  void killTask(
+      const FrameworkID& frameworkId,
+      const TaskID& taskId);
+  void schedulerMessage(
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const std::string& data);
+  void registerSlave(
+      const process::UPID& from,
+      const SlaveInfo& slaveInfo);
+  void reregisterSlave(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const SlaveInfo& slaveInfo,
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks);
+  void unregisterSlave(
+      const SlaveID& slaveId);
+  void statusUpdate(
+      const StatusUpdate& update,
+      const UPID& pid);
+  void exitedExecutor(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      int32_t status);
+  void deactivateSlave(
+      const SlaveID& slaveId);
 
   // TODO(bmahler): It would be preferred to use a unique libprocess
   // Process identifier (PID is not sufficient) for identifying the
   // framework instance, rather than relying on re-registration time.
-  void frameworkFailoverTimeout(const FrameworkID& frameworkId,
-                                const Time& reregisteredTime);
+  void frameworkFailoverTimeout(
+      const FrameworkID& frameworkId,
+      const Time& reregisteredTime);
 
-  void offer(const FrameworkID& framework,
-             const hashmap<SlaveID, Resources>& resources);
+  void offer(
+      const FrameworkID& framework,
+      const hashmap<SlaveID, Resources>& resources);
 
   void reconcileTasks(
+      const process::UPID& from,
       const FrameworkID& frameworkId,
       const std::vector<TaskStatus>& statuses);
 
-  void authenticate(const UPID& pid);
+  void authenticate(
+      const process::UPID& from,
+      const process::UPID& pid);
 
 protected:
   virtual void initialize();
   virtual void finalize();
   virtual void exited(const UPID& pid);
 
-  void _registerFramework(const FrameworkInfo& frameworkInfo, const UPID& pid);
-
-  void _reregisterFramework(
-      const FrameworkInfo& frameworkInfo,
-      bool failover,
-      const UPID& pid);
-
   void deactivate(Framework* framework);
 
   // 'promise' is used to signal finish of authentication.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/sasl/authenticatee.hpp
----------------------------------------------------------------------
diff --git a/src/sasl/authenticatee.hpp b/src/sasl/authenticatee.hpp
index 09ef018..f1a677f 100644
--- a/src/sasl/authenticatee.hpp
+++ b/src/sasl/authenticatee.hpp
@@ -242,7 +242,7 @@ protected:
     message.set_mechanism(mechanism);
     message.set_data(output, length);
 
-    send(from, message);
+    reply(message);
 
     status = STEPPING;
   }
@@ -279,7 +279,7 @@ protected:
       if (output != NULL && length > 0) {
         message.set_data(output, length);
       }
-      send(from, message);
+      reply(message);
     } else {
       status = ERROR;
       std::string error(sasl_errdetail(connection));

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 824b4b7..3049096 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -554,7 +554,7 @@ protected:
     send(pid, message);
   }
 
-  void lostSlave(const SlaveID& slaveId)
+  void lostSlave(const UPID& from, const SlaveID& slaveId)
   {
     if (aborted) {
       VLOG(1) << "Ignoring lost slave message because the driver is aborted!";
@@ -563,7 +563,7 @@ protected:
 
     if (from != master) {
       LOG(WARNING) << "Ignoring lost slave message from " << from
-                   << "because it is not from the registered master ("
+                   << " because it is not from the registered master ("
                    << master << ")";
       return;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5272600..243a830 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -402,7 +402,7 @@ void Slave::finalize()
     // checkpointing. This is because slave recovery tests terminate
     // the slave to simulate slave restart.
     if (!frameworks[frameworkId]->info.checkpoint()) {
-      shutdownFramework(frameworkId);
+      shutdownFramework(UPID(), frameworkId);
     }
   }
 
@@ -421,7 +421,7 @@ void Slave::finalize()
 }
 
 
-void Slave::shutdown()
+void Slave::shutdown(const UPID& from)
 {
   // Allow shutdown message only if
   // 1) Its a message received from the registered master or
@@ -451,7 +451,7 @@ void Slave::shutdown()
     // NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
     // can potentially remove a framework from 'frameworks'.
     foreach (const FrameworkID& frameworkId, frameworks.keys()) {
-      shutdownFramework(frameworkId);
+      shutdownFramework(from, frameworkId);
     }
   }
 }
@@ -534,7 +534,7 @@ void Slave::noMasterDetected()
 }
 
 
-void Slave::registered(const SlaveID& slaveId)
+void Slave::registered(const UPID& from, const SlaveID& slaveId)
 {
   if (from != master) {
     LOG(WARNING) << "Ignoring registration message from " << from
@@ -581,7 +581,7 @@ void Slave::registered(const SlaveID& slaveId)
 }
 
 
-void Slave::reregistered(const SlaveID& slaveId)
+void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 {
   if (from != master) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
@@ -1102,7 +1102,9 @@ void Slave::killTask(const FrameworkID& frameworkId, 
const TaskID& taskId)
 // sending back a shut down acknowledgement, because otherwise you
 // could get into a state where a shut down was sent, dropped, and
 // therefore never processed.
-void Slave::shutdownFramework(const FrameworkID& frameworkId)
+void Slave::shutdownFramework(
+    const UPID& from,
+    const FrameworkID& frameworkId)
 {
   // Allow shutdownFramework() only if
   // its called directly (e.g. Slave::finalize()) or
@@ -1394,6 +1396,7 @@ void Slave::_statusUpdateAcknowledgement(
 
 
 void Slave::registerExecutor(
+    const UPID& from,
     const FrameworkID& frameworkId,
     const ExecutorID& executorId)
 {
@@ -1545,6 +1548,7 @@ void Slave::registerExecutor(
 
 
 void Slave::reregisterExecutor(
+    const UPID& from,
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
     const vector<TaskInfo>& tasks,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5211b46..b9f9b04 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -82,13 +82,13 @@ public:
 
   virtual ~Slave();
 
-  void shutdown();
+  void shutdown(const process::UPID& from);
 
   void newMasterDetected(const UPID& pid);
   void noMasterDetected();
   void masterDetectionFailure();
-  void registered(const SlaveID& slaveId);
-  void reregistered(const SlaveID& slaveId);
+  void registered(const process::UPID& from, const SlaveID& slaveId);
+  void reregistered(const process::UPID& from, const SlaveID& slaveId);
   void doReliableRegistration();
 
   void runTask(
@@ -108,7 +108,9 @@ public:
 
   void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
 
-  void shutdownFramework(const FrameworkID& frameworkId);
+  void shutdownFramework(
+      const process::UPID& from,
+      const FrameworkID& frameworkId);
 
   void schedulerMessage(
       const SlaveID& slaveId,
@@ -119,6 +121,7 @@ public:
   void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
 
   void registerExecutor(
+      const process::UPID& from,
       const FrameworkID& frameworkId,
       const ExecutorID& executorId);
 
@@ -127,6 +130,7 @@ public:
   //           driver never received an ACK for.)
   // 'updates' : Unacknowledged updates.
   void reregisterExecutor(
+      const process::UPID& from,
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
       const std::vector<TaskInfo>& tasks,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1657a845/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index f743bb3..5621139 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -379,7 +379,7 @@ inline Try<Nothing> Cluster::Slaves::stop(
   Slave slave = slaves[pid];
 
   if (shutdown) {
-    process::dispatch(slave.slave, &slave::Slave::shutdown);
+    process::dispatch(slave.slave, &slave::Slave::shutdown, process::UPID());
   } else {
     process::terminate(slave.slave);
   }

Reply via email to