Fixed a bug with resource accounting in reconciliation.

Review: https://reviews.apache.org/r/14435


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7a38b74f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7a38b74f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7a38b74f

Branch: refs/heads/master
Commit: 7a38b74f021132fe79f1f4d0ac8940239880ea74
Parents: 1bd6f42
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Tue Oct 1 13:51:16 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Wed Oct 2 12:20:32 2013 -0700

----------------------------------------------------------------------
 src/master/allocator.hpp                      | 32 +++++++++++++++++
 src/master/hierarchical_allocator_process.hpp | 42 +++++++++++++++++++++-
 src/master/master.cpp                         | 39 ++++++++++----------
 src/tests/mesos.hpp                           | 26 ++++++++++++++
 4 files changed, 117 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index 46a7370..85ed214 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -85,6 +85,14 @@ public:
   virtual void slaveRemoved(
       const SlaveID& slaveId) = 0;
 
+  // No longer offers resources for the disconnected slave.
+  virtual void slaveDisconnected(
+      const SlaveID& slaveId) = 0;
+
+  // Resumes resource offers for the reconnected slave.
+  virtual void slaveReconnected(
+      const SlaveID& slaveId) = 0;
+
   virtual void updateWhitelist(
       const Option<hashset<std::string> >& whitelist) = 0;
 
@@ -156,6 +164,12 @@ public:
   void slaveRemoved(
       const SlaveID& slaveId);
 
+  void slaveDisconnected(
+      const SlaveID& slaveId);
+
+  void slaveReconnected(
+      const SlaveID& slaveId);
+
   void updateWhitelist(
       const Option<hashset<std::string> >& whitelist);
 
@@ -282,6 +296,24 @@ inline void Allocator::slaveRemoved(const SlaveID& slaveId)
 }
 
 
+inline void Allocator::slaveDisconnected(const SlaveID& slaveId)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::slaveDisconnected,
+      slaveId);
+}
+
+
+inline void Allocator::slaveReconnected(const SlaveID& slaveId)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::slaveReconnected,
+      slaveId);
+}
+
+
 inline void Allocator::updateWhitelist(
     const Option<hashset<std::string> >& whitelist)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp 
b/src/master/hierarchical_allocator_process.hpp
index 183b205..b86a5a2 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -60,6 +60,7 @@ struct Slave
 
   Slave(const SlaveInfo& _info)
     : available(_info.resources()),
+      connected(true),
       whitelisted(false),
       checkpoint(_info.checkpoint()),
       info(_info) {}
@@ -71,6 +72,10 @@ struct Slave
   // Contains all of the resources currently free on this slave.
   Resources available;
 
+  // Whether the slave is connected. Resources are not offered for
+  // disconnected slaves until they reconnect.
+  bool connected;
+
   // Indicates if the resources on this slave should be offered to
   // frameworks.
   bool whitelisted;
@@ -140,6 +145,12 @@ public:
   void slaveRemoved(
       const SlaveID& slaveId);
 
+  void slaveDisconnected(
+      const SlaveID& slaveId);
+
+  void slaveReconnected(
+      const SlaveID& slaveId);
+
   void updateWhitelist(
       const Option<hashset<std::string> >& whitelist);
 
@@ -456,7 +467,35 @@ HierarchicalAllocatorProcess<RoleSorter, 
FrameworkSorter>::slaveRemoved(
   // HierarchicalAllocatorProcess::expire gets invoked (or the framework
   // that applied the filters gets removed).
 
-  LOG(INFO)<< "Removed slave " << slaveId;
+  LOG(INFO) << "Removed slave " << slaveId;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveDisconnected(
+    const SlaveID& slaveId)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  slaves[slaveId].connected = false;
+
+  LOG(INFO) << "Slave " << slaveId << " disconnected";
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveReconnected(
+    const SlaveID& slaveId)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  slaves[slaveId].connected = true;
+
+  LOG(INFO)<< "Slave " << slaveId << " reconnected";
 }
 
 
@@ -707,6 +746,7 @@ HierarchicalAllocatorProcess<RoleSorter, 
FrameworkSorter>::allocate(
         bool filtered = isFiltered(frameworkId, slaveId, resources);
 
         if (!filtered &&
+            slaves[slaveId].connected &&
             slaves[slaveId].whitelisted &&
             allocatable(resources)) {
           VLOG(1)

http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ffbb53d..6f6d66c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -564,9 +564,7 @@ void Master::exited(const UPID& pid)
         // Mark the slave as disconnected and remove it from the allocator.
         slave->disconnected = true;
 
-        // TODO(vinod/Thomas): Instead of removing the slave, we should
-        // have 'Allocator::slave{Reconnected, Disconnected}'.
-        allocator->slaveRemoved(slave->id);
+        allocator->slaveDisconnected(slave->id);
 
         // If a slave is checkpointing, remove all non-checkpointing
         // frameworks from the slave.
@@ -592,8 +590,6 @@ void Master::exited(const UPID& pid)
         }
 
         foreach (Offer* offer, utils::copy(slave->offers)) {
-          // TODO(vinod): We don't need to call 'Allocator::resourcesRecovered'
-          // once MESOS-621 is fixed.
           allocator->resourcesRecovered(
               offer->framework_id(), slave->id, offer->resources());
 
@@ -1117,6 +1113,7 @@ void Master::reregisterSlave(const SlaveID& slaveId,
     reply(ShutdownMessage());
   } else {
     Slave* slave = getSlave(slaveId);
+
     if (slave != NULL) {
       slave->reregisteredTime = Clock::now();
 
@@ -1131,22 +1128,13 @@ void Master::reregisterSlave(const SlaveID& slaveId,
                    << ") is being allowed to re-register with an already"
                    << " in use id (" << slaveId << ")";
 
-      // If this is a disconnected slave, add it back to the allocator.
-      if (slave->disconnected) {
-        slave->disconnected = false; // Reset the flag.
-
-        hashmap<FrameworkID, Resources> resources;
-        foreach (const ExecutorInfo& executorInfo, executorInfos) {
-          resources[executorInfo.framework_id()] += executorInfo.resources();
-        }
-        foreach (const Task& task, tasks) {
-          // Ignore tasks that have reached terminal state.
-          if (!protobuf::isTerminalState(task.state())) {
-            resources[task.framework_id()] += task.resources();
-          }
-        }
-        allocator->slaveAdded(slaveId, slaveInfo, resources);
-      }
+      // TODO(bmahler): There's an implicit assumption here that when
+      // the master already knows about this slave, the slave cannot
+      // have tasks unknown to the master. This _should_ be the case
+      // since the causal relationship is:
+      //   slave removes task -> master removes task
+      // We should enforce this via a CHECK (dangerous), or by shutting
+      // down slaves that are found to violate this assumption.
 
       SlaveReregisteredMessage message;
       message.mutable_slave_id()->MergeFrom(slave->id);
@@ -1166,6 +1154,15 @@ void Master::reregisterSlave(const SlaveID& slaveId,
       // NOTE: This needs to be done after the registration message is
       // sent to the slave and the new pid is linked.
       reconcile(slave, executorInfos, tasks);
+
+      // If this is a disconnected slave, add it back to the allocator.
+      // This is done after reconciliation to ensure the allocator's
+      // offers include the recovered resources initially on this
+      // slave.
+      if (slave->disconnected) {
+        slave->disconnected = false; // Reset the flag.
+        allocator->slaveReconnected(slaveId);
+      }
     } else {
       // NOTE: This handles the case when the slave tries to
       // re-register with a failed over master.

http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8fbd56c..bc365fb 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -335,6 +335,12 @@ public:
     ON_CALL(*this, slaveRemoved(_))
       .WillByDefault(InvokeSlaveRemoved(this));
 
+    ON_CALL(*this, slaveDisconnected(_))
+      .WillByDefault(InvokeSlaveDisconnected(this));
+
+    ON_CALL(*this, slaveReconnected(_))
+      .WillByDefault(InvokeSlaveReconnected(this));
+
     ON_CALL(*this, updateWhitelist(_))
       .WillByDefault(InvokeUpdateWhitelist(this));
 
@@ -371,6 +377,8 @@ public:
                                 const SlaveInfo&,
                                 const hashmap<FrameworkID, Resources>&));
   MOCK_METHOD1(slaveRemoved, void(const SlaveID&));
+  MOCK_METHOD1(slaveDisconnected, void(const SlaveID&));
+  MOCK_METHOD1(slaveReconnected, void(const SlaveID&));
   MOCK_METHOD1(updateWhitelist, void(const Option<hashset<std::string> >&));
   MOCK_METHOD2(resourcesRequested, void(const FrameworkID&,
                                         const std::vector<Request>&));
@@ -466,6 +474,24 @@ ACTION_P(InvokeSlaveRemoved, allocator)
 }
 
 
+ACTION_P(InvokeSlaveDisconnected, allocator)
+{
+  process::dispatch(
+      allocator->real,
+      &master::allocator::AllocatorProcess::slaveDisconnected,
+      arg0);
+}
+
+
+ACTION_P(InvokeSlaveReconnected, allocator)
+{
+  process::dispatch(
+      allocator->real,
+      &master::allocator::AllocatorProcess::slaveReconnected,
+      arg0);
+}
+
+
 ACTION_P(InvokeUpdateWhitelist, allocator)
 {
   process::dispatch(

Reply via email to