This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push: new dcb5585 Fixed performance of tracking resource totals in allocator's roles tree. dcb5585 is described below commit dcb558597767aba9bcdcbc23e1a0a2674f653161 Author: Andrei Sekretenko <asekrete...@apache.org> AuthorDate: Wed May 13 18:15:13 2020 +0200 Fixed performance of tracking resource totals in allocator's roles tree. Before this patch, the roles tree was tracking total resources offered/allocated to a role as a single `Resources` objects. In the case when each agent has a limited number of unique resources (for example, a single persistent voulme), this resulted in poor asymptotic complexity of allocation versus the number of agents (O(N^2)) that was clearly observable in `HierarchicalAllocations_BENCHMARK_Test.PersistentVolumes`. In addition, the role tree code was violating the convention that `Resources` belonging to different agents should never be added. This patch implements per-agent tracking of `Resources` in the roles tree, thus improving the performance of allocation (and getting rid of the potentially problematic O(N^2) asymptotic) in the case of many agents with a limited number of unique resources each. Review: https://reviews.apache.org/r/72508 --- src/master/allocator/mesos/hierarchical.cpp | 139 ++++++++++++++++++++-------- src/master/allocator/mesos/hierarchical.hpp | 69 ++++++++++---- 2 files changed, 152 insertions(+), 56 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 5fe9ffc..9e50799 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -202,6 +202,46 @@ static hashmap<string, vector<ResourceQuantities>> unpackFrameworkOfferFilters( } +void ScalarResourceTotals::add( + const SlaveID& slaveID, + const Resources& resources) +{ + if (resources.scalars().empty()) { + // In this case, we avoid adding an entry to `scalars` to maintain the + // invariant that `scalars` doesn't track agents with empty resources. + return; + } + + scalarsTotal -= ResourceQuantities::fromScalarResources(scalars[slaveID]); + scalars.at(slaveID) += resources.scalars(); + scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID)); +} + + +void ScalarResourceTotals::subtract( + const SlaveID& slaveID, + const Resources& resources) +{ + if (resources.scalars().empty()) { + // `scalars` does not track agents with empty resources, thus subtracting + // empty resources from an agent is valid regardless of whether its + // resources are tracked in `scalars`. + return; + } + + CHECK_CONTAINS(scalars, slaveID); + CHECK_CONTAINS(scalars.at(slaveID), resources.scalars()); + + scalarsTotal -= ResourceQuantities::fromScalarResources(scalars.at(slaveID)); + scalars.at(slaveID) -= resources.scalars(); + scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID)); + + if (scalars.at(slaveID).empty()) { + scalars.erase(slaveID); + } +} + + Role::Role(const string& _role, Role* _parent) : role(_role), basename(strings::split(role, "/").back()), @@ -315,10 +355,10 @@ bool RoleTree::tryRemove(const std::string& role) break; } - CHECK(current->allocatedScalars_.empty()) + CHECK(current->allocatedUnreservedNonRevocable.empty()) << "An empty role " << current->role << " has non-empty allocated scalar resources: " - << current->allocatedScalars_; + << current->allocatedUnreservedNonRevocable.quantities(); Role* parent = CHECK_NOTNULL(current->parent); @@ -328,9 +368,17 @@ bool RoleTree::tryRemove(const std::string& role) (*metrics)->removeRole(current->role); } - CHECK(current->offeredOrAllocatedScalars_.empty()) - << " role: " << role - << " offeredOrAllocated: " << current->offeredOrAllocatedScalars_; + CHECK(current->offeredOrAllocatedUnreservedNonRevocable.empty()) + << "An empty role " << current->role + << " has non-empty offered or allocated" + << " unreserved non-revocable scalar resources: " + << current->offeredOrAllocatedUnreservedNonRevocable.quantities(); + + CHECK(current->offeredOrAllocatedReserved.empty()) + << "An empty role " << current->role + << " has non-empty offered or allocated reserved scalar resources: " + << current->offeredOrAllocatedReserved.quantities(); + roles_.erase(current->role); current = parent; @@ -385,28 +433,32 @@ void RoleTree::untrackReservations(const Resources& resources) } -void RoleTree::trackAllocated(const Resources& resources_) +void RoleTree::trackAllocated( + const SlaveID& slaveId, + const Resources& resources_) { foreachpair ( const string& role, const Resources& resources, - resources_.scalars().allocations()) { + resources_.scalars().unreserved().nonRevocable().allocations()) { applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) { - current->allocatedScalars_ += resources; + current->allocatedUnreservedNonRevocable.add(slaveId, resources); updateQuotaConsumedMetric(current); }); } } -void RoleTree::untrackAllocated(const Resources& resources_) +void RoleTree::untrackAllocated( + const SlaveID& slaveId, + const Resources& resources_) { foreachpair ( const string& role, const Resources& resources, - resources_.scalars().allocations()) { + resources_.scalars().unreserved().nonRevocable().allocations()) { applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) { - current->allocatedScalars_ -= resources; + current->allocatedUnreservedNonRevocable.subtract(slaveId, resources); updateQuotaConsumedMetric(current); }); } @@ -453,7 +505,9 @@ void RoleTree::updateWeight(const string& role, double weight) } -void RoleTree::trackOfferedOrAllocated(const Resources& resources_) +void RoleTree::trackOfferedOrAllocated( + const SlaveID& slaveId, + const Resources& resources_) { // TODO(mzhu): avoid building a map by traversing `resources` // and look for the allocation role of individual resource. @@ -464,14 +518,20 @@ void RoleTree::trackOfferedOrAllocated(const Resources& resources_) const Resources& resources, resources_.scalars().allocations()) { applyToRoleAndAncestors( - CHECK_NOTNONE(get_(role)), [&resources](Role* current) { - current->offeredOrAllocatedScalars_ += resources; - }); + CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) { + current->offeredOrAllocatedReserved.add( + slaveId, resources.reserved()); + + current->offeredOrAllocatedUnreservedNonRevocable.add( + slaveId, resources.unreserved().nonRevocable()); + }); } } -void RoleTree::untrackOfferedOrAllocated(const Resources& resources_) +void RoleTree::untrackOfferedOrAllocated( + const SlaveID& slaveId, + const Resources& resources_) { // TODO(mzhu): avoid building a map by traversing `resources` // and look for the allocation role of individual resource. @@ -482,12 +542,13 @@ void RoleTree::untrackOfferedOrAllocated(const Resources& resources_) const Resources& resources, resources_.scalars().allocations()) { applyToRoleAndAncestors( - CHECK_NOTNONE(get_(role)), [&resources](Role* current) { - CHECK_CONTAINS(current->offeredOrAllocatedScalars_, resources) - << " Role: " << current->role - << " offeredOrAllocated: " << current->offeredOrAllocatedScalars_; - current->offeredOrAllocatedScalars_ -= resources; - }); + CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) { + current->offeredOrAllocatedReserved.subtract( + slaveId, resources.reserved()); + + current->offeredOrAllocatedUnreservedNonRevocable.subtract( + slaveId, resources.unreserved().nonRevocable()); + }); } } @@ -503,8 +564,14 @@ std::string RoleTree::toJSON() const writer->field("limits", role->quota_.limits); writer->field( "reservation_quantities", role->reservationScalarQuantities_); + + writer->field( + "offered_or_allocated_reserved_quantities", + role->offeredOrAllocatedReserved.quantities()); + writer->field( - "offered_or_allocated_scalars", role->offeredOrAllocatedScalars_); + "offered_or_allocated_unreserved_nonrevocable_quantities", + role->offeredOrAllocatedUnreservedNonRevocable.quantities()); writer->field("frameworks", [&](JSON::ArrayWriter* writer) { foreach (const FrameworkID& id, role->frameworks_) { @@ -694,7 +761,7 @@ void HierarchicalAllocatorProcess::addFramework( // resources, so we only need to track them in the sorters. trackAllocatedResources(slaveId, frameworkId, resources); - roleTree.trackAllocated(resources); + roleTree.trackAllocated(slaveId, resources); } LOG(INFO) << "Added framework " << frameworkId; @@ -935,7 +1002,7 @@ void HierarchicalAllocatorProcess::addSlave( trackAllocatedResources(slaveId, frameworkId, allocation); - roleTree.trackAllocated(allocation); + roleTree.trackAllocated(slaveId, allocation); } // If we have just a number of recovered agents, we cannot distinguish @@ -975,7 +1042,7 @@ void HierarchicalAllocatorProcess::removeSlave( // untrackAllocatedResources() potentially removes allocation roles, thus // we need to untrack actually allocated resources in the roles tree first. - roleTree.untrackAllocated(slave.totalAllocated); + roleTree.untrackAllocated(slaveId, slave.totalAllocated); // Untrack resources in roleTree and sorter. foreachpair ( @@ -1233,8 +1300,8 @@ void HierarchicalAllocatorProcess::updateAllocation( slave.increaseAvailable(frameworkId, offeredResources); slave.decreaseAvailable(frameworkId, updatedOfferedResources); - roleTree.untrackOfferedOrAllocated(offeredResources); - roleTree.trackOfferedOrAllocated(updatedOfferedResources); + roleTree.untrackOfferedOrAllocated(slaveId, offeredResources); + roleTree.trackOfferedOrAllocated(slaveId, updatedOfferedResources); // Update the allocation in the framework sorter. frameworkSorter->update( @@ -1512,7 +1579,7 @@ void HierarchicalAllocatorProcess::transitionOfferedToAllocated( const Resources& resources) { CHECK_NOTNONE(getSlave(slaveId))->totalAllocated += resources; - roleTree.trackAllocated(resources); + roleTree.trackAllocated(slaveId, resources); } @@ -1534,7 +1601,7 @@ void HierarchicalAllocatorProcess::recoverResources( if (isAllocated && slave.isSome()) { CHECK_CONTAINS((*slave)->totalAllocated, resources); (*slave)->totalAllocated -= resources; - roleTree.untrackAllocated(resources); + roleTree.untrackAllocated(slaveId, resources); } Option<Framework*> framework = getFramework(frameworkId); @@ -1939,10 +2006,7 @@ void HierarchicalAllocatorProcess::__generateOffers() // these as metrics. if (r->quota() != DEFAULT_QUOTA) { logHeadroomInfo = true; - rolesConsumedQuota[r->role] += - r->reservationScalarQuantities() + - ResourceQuantities::fromScalarResources( - r->offeredOrAllocatedScalars().unreserved().nonRevocable()); + rolesConsumedQuota[r->role] += r->quotaOfferedOrConsumed(); } } @@ -1988,8 +2052,7 @@ void HierarchicalAllocatorProcess::__generateOffers() // unallocated reservations = total reservations - allocated reservations availableHeadroom -= roleTree.root()->reservationScalarQuantities() - - ResourceQuantities::fromScalarResources( - roleTree.root()->offeredOrAllocatedScalars().reserved()); + roleTree.root()->offeredOrAllocatedReservedScalarQuantities(); // Subtract revocable resources. foreachvalue (const Slave& slave, slaves) { @@ -3066,7 +3129,7 @@ void HierarchicalAllocatorProcess::trackAllocatedResources( CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) << " for role " << role; - roleTree.trackOfferedOrAllocated(allocation); + roleTree.trackOfferedOrAllocated(slaveId, allocation); roleSorter->allocated(role, slaveId, allocation); frameworkSorter->allocated( @@ -3100,7 +3163,7 @@ void HierarchicalAllocatorProcess::untrackAllocatedResources( CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) << "for role " << role; - roleTree.untrackOfferedOrAllocated(allocation); + roleTree.untrackOfferedOrAllocated(slaveId, allocation); frameworkSorter->unallocated(frameworkId.value(), slaveId, allocation); diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 6454cda..e444e47 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -112,6 +112,28 @@ struct Framework }; +// Helper for tracking cross-agent scalar resource totals. +// Needed because directly summing Resources across agents has +// prohibitively expensive time complexity: O(N^2) vs the number of agents, +// and also violates the convention that Resources belonging to different agents +// should not be added. +class ScalarResourceTotals +{ +public: + // These methods implicitly filter out non-scalars from the inputs, thus + // the caller is not obliged to ensure that `resources` contains only scalars. + void add(const SlaveID& slaveID, const Resources& resources); + void subtract(const SlaveID& slaveID, const Resources& resources); + + bool empty() const { return scalars.empty(); } + ResourceQuantities quantities() const { return scalarsTotal; } + +private: + hashmap<SlaveID, Resources> scalars; + ResourceQuantities scalarsTotal; +}; + + class Role { public: @@ -122,19 +144,24 @@ public: return reservationScalarQuantities_; } - const Resources& offeredOrAllocatedScalars() const + ResourceQuantities offeredOrAllocatedReservedScalarQuantities() const { - return offeredOrAllocatedScalars_; + return offeredOrAllocatedReserved.quantities(); } const hashset<FrameworkID>& frameworks() const { return frameworks_; } const Quota& quota() const { return quota_; } + ResourceQuantities quotaOfferedOrConsumed() const + { + return offeredOrAllocatedUnreservedNonRevocable.quantities() + + reservationScalarQuantities_; + } + ResourceQuantities quotaConsumed() const { - return ResourceQuantities::fromScalarResources( - allocatedScalars_.unreserved().nonRevocable()) + + return allocatedUnreservedNonRevocable.quantities() + reservationScalarQuantities_; } @@ -187,12 +214,13 @@ private: // resources offered or allocated to this role. hashset<FrameworkID> frameworks_; - // Total allocated or offered scalar resources to this role, including - // meta data. This field dose not affect role's lifecycle. However, since - // any offered or allocated resources should be tied to a framework, - // an empty role (that has no registered framework) must have - // empty offeredOrAllocated resources. - Resources offeredOrAllocatedScalars_; + // Totals tracker for unreserved non-revocable offered/allocated resources. + // Note that since any offered or allocated resources should be tied to + // a framework, an empty role (that has no registered framework) must have + // this total empty. + ScalarResourceTotals offeredOrAllocatedUnreservedNonRevocable; + + ScalarResourceTotals offeredOrAllocatedReserved; // Aggregated reserved scalar resource quantities on all agents tied to this // role, if any. This includes both its own reservations as well as @@ -200,9 +228,9 @@ private: // Note that non-scalar resources, such as ports, are excluded. ResourceQuantities reservationScalarQuantities_; - // Scalar resources actually allocated (i.e. used for launching tasks) to this - // role and any of its subroles, both reserved and unreserved, on all agents. - Resources allocatedScalars_; + // Totals tracker for unreserved non-revocable resources actually allocated + // (i.e. used for launching tasks) to this role and any of its subroles. + ScalarResourceTotals allocatedUnreservedNonRevocable; hashmap<std::string, Role*> children_; }; @@ -240,9 +268,9 @@ public: void trackReservations(const Resources& resources); void untrackReservations(const Resources& resources); - // We keep track of allocated resources which are actially used by frameworks. - void trackAllocated(const Resources& resources); - void untrackAllocated(const Resources& resources); + // We keep track of allocated resources which are actually used by frameworks. + void trackAllocated(const SlaveID& slaveId, const Resources& resources); + void untrackAllocated(const SlaveID& slaveId, const Resources& resources); void trackFramework( const FrameworkID& frameworkId, const std::string& role); @@ -253,8 +281,13 @@ public: void updateWeight(const std::string& role, double weight); - void trackOfferedOrAllocated(const Resources& resources); - void untrackOfferedOrAllocated(const Resources& resources); + void trackOfferedOrAllocated( + const SlaveID& slaveId, + const Resources& resources); + + void untrackOfferedOrAllocated( + const SlaveID& slaveId, + const Resources& resources); // Dump the role tree state in JSON format for debugging. std::string toJSON() const;