Fixed a bug with resource accounting in reconciliation. Review: https://reviews.apache.org/r/14435
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7a38b74f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7a38b74f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7a38b74f Branch: refs/heads/master Commit: 7a38b74f021132fe79f1f4d0ac8940239880ea74 Parents: 1bd6f42 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Tue Oct 1 13:51:16 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Wed Oct 2 12:20:32 2013 -0700 ---------------------------------------------------------------------- src/master/allocator.hpp | 32 +++++++++++++++++ src/master/hierarchical_allocator_process.hpp | 42 +++++++++++++++++++++- src/master/master.cpp | 39 ++++++++++---------- src/tests/mesos.hpp | 26 ++++++++++++++ 4 files changed, 117 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp index 46a7370..85ed214 100644 --- a/src/master/allocator.hpp +++ b/src/master/allocator.hpp @@ -85,6 +85,14 @@ public: virtual void slaveRemoved( const SlaveID& slaveId) = 0; + // No longer offers resources for the disconnected slave. + virtual void slaveDisconnected( + const SlaveID& slaveId) = 0; + + // Resumes resource offers for the reconnected slave. + virtual void slaveReconnected( + const SlaveID& slaveId) = 0; + virtual void updateWhitelist( const Option<hashset<std::string> >& whitelist) = 0; @@ -156,6 +164,12 @@ public: void slaveRemoved( const SlaveID& slaveId); + void slaveDisconnected( + const SlaveID& slaveId); + + void slaveReconnected( + const SlaveID& slaveId); + void updateWhitelist( const Option<hashset<std::string> >& whitelist); @@ -282,6 +296,24 @@ inline void Allocator::slaveRemoved(const SlaveID& slaveId) } +inline void Allocator::slaveDisconnected(const SlaveID& slaveId) +{ + process::dispatch( + process, + &AllocatorProcess::slaveDisconnected, + slaveId); +} + + +inline void Allocator::slaveReconnected(const SlaveID& slaveId) +{ + process::dispatch( + process, + &AllocatorProcess::slaveReconnected, + slaveId); +} + + inline void Allocator::updateWhitelist( const Option<hashset<std::string> >& whitelist) { http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/master/hierarchical_allocator_process.hpp ---------------------------------------------------------------------- diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp index 183b205..b86a5a2 100644 --- a/src/master/hierarchical_allocator_process.hpp +++ b/src/master/hierarchical_allocator_process.hpp @@ -60,6 +60,7 @@ struct Slave Slave(const SlaveInfo& _info) : available(_info.resources()), + connected(true), whitelisted(false), checkpoint(_info.checkpoint()), info(_info) {} @@ -71,6 +72,10 @@ struct Slave // Contains all of the resources currently free on this slave. Resources available; + // Whether the slave is connected. Resources are not offered for + // disconnected slaves until they reconnect. + bool connected; + // Indicates if the resources on this slave should be offered to // frameworks. bool whitelisted; @@ -140,6 +145,12 @@ public: void slaveRemoved( const SlaveID& slaveId); + void slaveDisconnected( + const SlaveID& slaveId); + + void slaveReconnected( + const SlaveID& slaveId); + void updateWhitelist( const Option<hashset<std::string> >& whitelist); @@ -456,7 +467,35 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveRemoved( // HierarchicalAllocatorProcess::expire gets invoked (or the framework // that applied the filters gets removed). - LOG(INFO)<< "Removed slave " << slaveId; + LOG(INFO) << "Removed slave " << slaveId; +} + + +template <class RoleSorter, class FrameworkSorter> +void +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveDisconnected( + const SlaveID& slaveId) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + slaves[slaveId].connected = false; + + LOG(INFO) << "Slave " << slaveId << " disconnected"; +} + + +template <class RoleSorter, class FrameworkSorter> +void +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveReconnected( + const SlaveID& slaveId) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + slaves[slaveId].connected = true; + + LOG(INFO)<< "Slave " << slaveId << " reconnected"; } @@ -707,6 +746,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( bool filtered = isFiltered(frameworkId, slaveId, resources); if (!filtered && + slaves[slaveId].connected && slaves[slaveId].whitelisted && allocatable(resources)) { VLOG(1) http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index ffbb53d..6f6d66c 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -564,9 +564,7 @@ void Master::exited(const UPID& pid) // Mark the slave as disconnected and remove it from the allocator. slave->disconnected = true; - // TODO(vinod/Thomas): Instead of removing the slave, we should - // have 'Allocator::slave{Reconnected, Disconnected}'. - allocator->slaveRemoved(slave->id); + allocator->slaveDisconnected(slave->id); // If a slave is checkpointing, remove all non-checkpointing // frameworks from the slave. @@ -592,8 +590,6 @@ void Master::exited(const UPID& pid) } foreach (Offer* offer, utils::copy(slave->offers)) { - // TODO(vinod): We don't need to call 'Allocator::resourcesRecovered' - // once MESOS-621 is fixed. allocator->resourcesRecovered( offer->framework_id(), slave->id, offer->resources()); @@ -1117,6 +1113,7 @@ void Master::reregisterSlave(const SlaveID& slaveId, reply(ShutdownMessage()); } else { Slave* slave = getSlave(slaveId); + if (slave != NULL) { slave->reregisteredTime = Clock::now(); @@ -1131,22 +1128,13 @@ void Master::reregisterSlave(const SlaveID& slaveId, << ") is being allowed to re-register with an already" << " in use id (" << slaveId << ")"; - // If this is a disconnected slave, add it back to the allocator. - if (slave->disconnected) { - slave->disconnected = false; // Reset the flag. - - hashmap<FrameworkID, Resources> resources; - foreach (const ExecutorInfo& executorInfo, executorInfos) { - resources[executorInfo.framework_id()] += executorInfo.resources(); - } - foreach (const Task& task, tasks) { - // Ignore tasks that have reached terminal state. - if (!protobuf::isTerminalState(task.state())) { - resources[task.framework_id()] += task.resources(); - } - } - allocator->slaveAdded(slaveId, slaveInfo, resources); - } + // TODO(bmahler): There's an implicit assumption here that when + // the master already knows about this slave, the slave cannot + // have tasks unknown to the master. This _should_ be the case + // since the causal relationship is: + // slave removes task -> master removes task + // We should enforce this via a CHECK (dangerous), or by shutting + // down slaves that are found to violate this assumption. SlaveReregisteredMessage message; message.mutable_slave_id()->MergeFrom(slave->id); @@ -1166,6 +1154,15 @@ void Master::reregisterSlave(const SlaveID& slaveId, // NOTE: This needs to be done after the registration message is // sent to the slave and the new pid is linked. reconcile(slave, executorInfos, tasks); + + // If this is a disconnected slave, add it back to the allocator. + // This is done after reconciliation to ensure the allocator's + // offers include the recovered resources initially on this + // slave. + if (slave->disconnected) { + slave->disconnected = false; // Reset the flag. + allocator->slaveReconnected(slaveId); + } } else { // NOTE: This handles the case when the slave tries to // re-register with a failed over master. http://git-wip-us.apache.org/repos/asf/mesos/blob/7a38b74f/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 8fbd56c..bc365fb 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -335,6 +335,12 @@ public: ON_CALL(*this, slaveRemoved(_)) .WillByDefault(InvokeSlaveRemoved(this)); + ON_CALL(*this, slaveDisconnected(_)) + .WillByDefault(InvokeSlaveDisconnected(this)); + + ON_CALL(*this, slaveReconnected(_)) + .WillByDefault(InvokeSlaveReconnected(this)); + ON_CALL(*this, updateWhitelist(_)) .WillByDefault(InvokeUpdateWhitelist(this)); @@ -371,6 +377,8 @@ public: const SlaveInfo&, const hashmap<FrameworkID, Resources>&)); MOCK_METHOD1(slaveRemoved, void(const SlaveID&)); + MOCK_METHOD1(slaveDisconnected, void(const SlaveID&)); + MOCK_METHOD1(slaveReconnected, void(const SlaveID&)); MOCK_METHOD1(updateWhitelist, void(const Option<hashset<std::string> >&)); MOCK_METHOD2(resourcesRequested, void(const FrameworkID&, const std::vector<Request>&)); @@ -466,6 +474,24 @@ ACTION_P(InvokeSlaveRemoved, allocator) } +ACTION_P(InvokeSlaveDisconnected, allocator) +{ + process::dispatch( + allocator->real, + &master::allocator::AllocatorProcess::slaveDisconnected, + arg0); +} + + +ACTION_P(InvokeSlaveReconnected, allocator) +{ + process::dispatch( + allocator->real, + &master::allocator::AllocatorProcess::slaveReconnected, + arg0); +} + + ACTION_P(InvokeUpdateWhitelist, allocator) { process::dispatch(