This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new 1ff2fcd  Fixed performance of tracking resource totals in allocator's 
roles tree.
1ff2fcd is described below

commit 1ff2fcd90eabd98786531748869b8596120f7dfe
Author: Andrei Sekretenko <asekrete...@apache.org>
AuthorDate: Wed May 13 18:15:13 2020 +0200

    Fixed performance of tracking resource totals in allocator's roles tree.
    
    Before this patch, the roles tree was tracking total resources
    offered/allocated to a role as a single `Resources` objects.
    In the case when each agent has a limited number of unique resources
    (for example, a single persistent voulme), this resulted in poor
    asymptotic complexity of allocation versus the number of agents
    (O(N^2)) that was clearly observable in
    `HierarchicalAllocations_BENCHMARK_Test.PersistentVolumes`.
    In addition, the role tree code was violating the convention that
    `Resources` belonging to different agents should never be added.
    
    This patch implements per-agent tracking of `Resources` in the roles
    tree, thus improving the performance of allocation (and getting rid of
    the potentially problematic O(N^2) asymptotic) in the case of many
    agents with a limited number of unique resources each.
    
    Review: https://reviews.apache.org/r/72508
---
 src/master/allocator/mesos/hierarchical.cpp | 139 ++++++++++++++++++++--------
 src/master/allocator/mesos/hierarchical.hpp |  69 ++++++++++----
 2 files changed, 152 insertions(+), 56 deletions(-)

diff --git a/src/master/allocator/mesos/hierarchical.cpp 
b/src/master/allocator/mesos/hierarchical.cpp
index 5fe9ffc..9e50799 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -202,6 +202,46 @@ static hashmap<string, vector<ResourceQuantities>> 
unpackFrameworkOfferFilters(
 }
 
 
+void ScalarResourceTotals::add(
+    const SlaveID& slaveID,
+    const Resources& resources)
+{
+  if (resources.scalars().empty()) {
+    // In this case, we avoid adding an entry to `scalars` to maintain the
+    // invariant that `scalars` doesn't track agents with empty resources.
+    return;
+  }
+
+  scalarsTotal -= ResourceQuantities::fromScalarResources(scalars[slaveID]);
+  scalars.at(slaveID) += resources.scalars();
+  scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID));
+}
+
+
+void ScalarResourceTotals::subtract(
+    const SlaveID& slaveID,
+    const Resources& resources)
+{
+  if (resources.scalars().empty()) {
+    // `scalars` does not track agents with empty resources, thus subtracting
+    // empty resources from an agent is valid regardless of whether its
+    // resources are tracked in `scalars`.
+    return;
+  }
+
+  CHECK_CONTAINS(scalars, slaveID);
+  CHECK_CONTAINS(scalars.at(slaveID), resources.scalars());
+
+  scalarsTotal -= ResourceQuantities::fromScalarResources(scalars.at(slaveID));
+  scalars.at(slaveID) -= resources.scalars();
+  scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID));
+
+  if (scalars.at(slaveID).empty()) {
+    scalars.erase(slaveID);
+  }
+}
+
+
 Role::Role(const string& _role, Role* _parent)
   : role(_role),
     basename(strings::split(role, "/").back()),
@@ -315,10 +355,10 @@ bool RoleTree::tryRemove(const std::string& role)
       break;
     }
 
-    CHECK(current->allocatedScalars_.empty())
+    CHECK(current->allocatedUnreservedNonRevocable.empty())
       << "An empty role " << current->role
       << " has non-empty allocated scalar resources: "
-      << current->allocatedScalars_;
+      << current->allocatedUnreservedNonRevocable.quantities();
 
     Role* parent = CHECK_NOTNULL(current->parent);
 
@@ -328,9 +368,17 @@ bool RoleTree::tryRemove(const std::string& role)
       (*metrics)->removeRole(current->role);
     }
 
-    CHECK(current->offeredOrAllocatedScalars_.empty())
-      << " role: " << role
-      << " offeredOrAllocated: " << current->offeredOrAllocatedScalars_;
+    CHECK(current->offeredOrAllocatedUnreservedNonRevocable.empty())
+      << "An empty role " << current->role
+      << " has non-empty offered or allocated"
+      << " unreserved non-revocable scalar resources: "
+      << current->offeredOrAllocatedUnreservedNonRevocable.quantities();
+
+    CHECK(current->offeredOrAllocatedReserved.empty())
+      << "An empty role " << current->role
+      << " has non-empty offered or allocated reserved scalar resources: "
+      << current->offeredOrAllocatedReserved.quantities();
+
     roles_.erase(current->role);
 
     current = parent;
@@ -385,28 +433,32 @@ void RoleTree::untrackReservations(const Resources& 
resources)
 }
 
 
-void RoleTree::trackAllocated(const Resources& resources_)
+void RoleTree::trackAllocated(
+    const SlaveID& slaveId,
+    const Resources& resources_)
 {
   foreachpair (
       const string& role,
       const Resources& resources,
-      resources_.scalars().allocations()) {
+      resources_.scalars().unreserved().nonRevocable().allocations()) {
     applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) {
-      current->allocatedScalars_ += resources;
+      current->allocatedUnreservedNonRevocable.add(slaveId, resources);
       updateQuotaConsumedMetric(current);
     });
   }
 }
 
 
-void RoleTree::untrackAllocated(const Resources& resources_)
+void RoleTree::untrackAllocated(
+    const SlaveID& slaveId,
+    const Resources& resources_)
 {
   foreachpair (
       const string& role,
       const Resources& resources,
-      resources_.scalars().allocations()) {
+      resources_.scalars().unreserved().nonRevocable().allocations()) {
     applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) {
-      current->allocatedScalars_ -= resources;
+      current->allocatedUnreservedNonRevocable.subtract(slaveId, resources);
       updateQuotaConsumedMetric(current);
     });
   }
@@ -453,7 +505,9 @@ void RoleTree::updateWeight(const string& role, double 
weight)
 }
 
 
-void RoleTree::trackOfferedOrAllocated(const Resources& resources_)
+void RoleTree::trackOfferedOrAllocated(
+    const SlaveID& slaveId,
+    const Resources& resources_)
 {
   // TODO(mzhu): avoid building a map by traversing `resources`
   // and look for the allocation role of individual resource.
@@ -464,14 +518,20 @@ void RoleTree::trackOfferedOrAllocated(const Resources& 
resources_)
       const Resources& resources,
       resources_.scalars().allocations()) {
     applyToRoleAndAncestors(
-        CHECK_NOTNONE(get_(role)), [&resources](Role* current) {
-      current->offeredOrAllocatedScalars_ += resources;
-    });
+        CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) {
+          current->offeredOrAllocatedReserved.add(
+              slaveId, resources.reserved());
+
+          current->offeredOrAllocatedUnreservedNonRevocable.add(
+              slaveId, resources.unreserved().nonRevocable());
+        });
   }
 }
 
 
-void RoleTree::untrackOfferedOrAllocated(const Resources& resources_)
+void RoleTree::untrackOfferedOrAllocated(
+    const SlaveID& slaveId,
+    const Resources& resources_)
 {
   // TODO(mzhu): avoid building a map by traversing `resources`
   // and look for the allocation role of individual resource.
@@ -482,12 +542,13 @@ void RoleTree::untrackOfferedOrAllocated(const Resources& 
resources_)
       const Resources& resources,
       resources_.scalars().allocations()) {
     applyToRoleAndAncestors(
-        CHECK_NOTNONE(get_(role)), [&resources](Role* current) {
-      CHECK_CONTAINS(current->offeredOrAllocatedScalars_, resources)
-        << " Role: " << current->role
-        << " offeredOrAllocated: " << current->offeredOrAllocatedScalars_;
-      current->offeredOrAllocatedScalars_ -= resources;
-    });
+        CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) {
+          current->offeredOrAllocatedReserved.subtract(
+              slaveId, resources.reserved());
+
+          current->offeredOrAllocatedUnreservedNonRevocable.subtract(
+              slaveId, resources.unreserved().nonRevocable());
+        });
   }
 }
 
@@ -503,8 +564,14 @@ std::string RoleTree::toJSON() const
       writer->field("limits", role->quota_.limits);
       writer->field(
           "reservation_quantities", role->reservationScalarQuantities_);
+
+      writer->field(
+          "offered_or_allocated_reserved_quantities",
+          role->offeredOrAllocatedReserved.quantities());
+
       writer->field(
-          "offered_or_allocated_scalars", role->offeredOrAllocatedScalars_);
+          "offered_or_allocated_unreserved_nonrevocable_quantities",
+          role->offeredOrAllocatedUnreservedNonRevocable.quantities());
 
       writer->field("frameworks", [&](JSON::ArrayWriter* writer) {
         foreach (const FrameworkID& id, role->frameworks_) {
@@ -694,7 +761,7 @@ void HierarchicalAllocatorProcess::addFramework(
     // resources, so we only need to track them in the sorters.
     trackAllocatedResources(slaveId, frameworkId, resources);
 
-    roleTree.trackAllocated(resources);
+    roleTree.trackAllocated(slaveId, resources);
   }
 
   LOG(INFO) << "Added framework " << frameworkId;
@@ -935,7 +1002,7 @@ void HierarchicalAllocatorProcess::addSlave(
 
     trackAllocatedResources(slaveId, frameworkId, allocation);
 
-    roleTree.trackAllocated(allocation);
+    roleTree.trackAllocated(slaveId, allocation);
   }
 
   // If we have just a number of recovered agents, we cannot distinguish
@@ -975,7 +1042,7 @@ void HierarchicalAllocatorProcess::removeSlave(
 
     // untrackAllocatedResources() potentially removes allocation roles, thus
     // we need to untrack actually allocated resources in the roles tree first.
-    roleTree.untrackAllocated(slave.totalAllocated);
+    roleTree.untrackAllocated(slaveId, slave.totalAllocated);
 
     // Untrack resources in roleTree and sorter.
     foreachpair (
@@ -1233,8 +1300,8 @@ void HierarchicalAllocatorProcess::updateAllocation(
   slave.increaseAvailable(frameworkId, offeredResources);
   slave.decreaseAvailable(frameworkId, updatedOfferedResources);
 
-  roleTree.untrackOfferedOrAllocated(offeredResources);
-  roleTree.trackOfferedOrAllocated(updatedOfferedResources);
+  roleTree.untrackOfferedOrAllocated(slaveId, offeredResources);
+  roleTree.trackOfferedOrAllocated(slaveId, updatedOfferedResources);
 
   // Update the allocation in the framework sorter.
   frameworkSorter->update(
@@ -1512,7 +1579,7 @@ void 
HierarchicalAllocatorProcess::transitionOfferedToAllocated(
     const Resources& resources)
 {
   CHECK_NOTNONE(getSlave(slaveId))->totalAllocated += resources;
-  roleTree.trackAllocated(resources);
+  roleTree.trackAllocated(slaveId, resources);
 }
 
 
@@ -1534,7 +1601,7 @@ void HierarchicalAllocatorProcess::recoverResources(
   if (isAllocated && slave.isSome()) {
     CHECK_CONTAINS((*slave)->totalAllocated, resources);
     (*slave)->totalAllocated -= resources;
-    roleTree.untrackAllocated(resources);
+    roleTree.untrackAllocated(slaveId, resources);
   }
 
   Option<Framework*> framework = getFramework(frameworkId);
@@ -1939,10 +2006,7 @@ void HierarchicalAllocatorProcess::__generateOffers()
     // these as metrics.
     if (r->quota() != DEFAULT_QUOTA) {
       logHeadroomInfo = true;
-      rolesConsumedQuota[r->role] +=
-        r->reservationScalarQuantities() +
-        ResourceQuantities::fromScalarResources(
-            r->offeredOrAllocatedScalars().unreserved().nonRevocable());
+      rolesConsumedQuota[r->role] += r->quotaOfferedOrConsumed();
     }
   }
 
@@ -1988,8 +2052,7 @@ void HierarchicalAllocatorProcess::__generateOffers()
   // unallocated reservations = total reservations - allocated reservations
   availableHeadroom -=
     roleTree.root()->reservationScalarQuantities() -
-    ResourceQuantities::fromScalarResources(
-        roleTree.root()->offeredOrAllocatedScalars().reserved());
+    roleTree.root()->offeredOrAllocatedReservedScalarQuantities();
 
   // Subtract revocable resources.
   foreachvalue (const Slave& slave, slaves) {
@@ -3066,7 +3129,7 @@ void 
HierarchicalAllocatorProcess::trackAllocatedResources(
     CHECK_CONTAINS(*frameworkSorter, frameworkId.value())
       << " for role " << role;
 
-    roleTree.trackOfferedOrAllocated(allocation);
+    roleTree.trackOfferedOrAllocated(slaveId, allocation);
 
     roleSorter->allocated(role, slaveId, allocation);
     frameworkSorter->allocated(
@@ -3100,7 +3163,7 @@ void 
HierarchicalAllocatorProcess::untrackAllocatedResources(
     CHECK_CONTAINS(*frameworkSorter, frameworkId.value())
       << "for role " << role;
 
-    roleTree.untrackOfferedOrAllocated(allocation);
+    roleTree.untrackOfferedOrAllocated(slaveId, allocation);
 
     frameworkSorter->unallocated(frameworkId.value(), slaveId, allocation);
 
diff --git a/src/master/allocator/mesos/hierarchical.hpp 
b/src/master/allocator/mesos/hierarchical.hpp
index 6454cda..e444e47 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -112,6 +112,28 @@ struct Framework
 };
 
 
+// Helper for tracking cross-agent scalar resource totals.
+// Needed because directly summing Resources across agents has
+// prohibitively expensive time complexity: O(N^2) vs the number of agents,
+// and also violates the convention that Resources belonging to different 
agents
+// should not be added.
+class ScalarResourceTotals
+{
+public:
+  // These methods implicitly filter out non-scalars from the inputs, thus
+  // the caller is not obliged to ensure that `resources` contains only 
scalars.
+  void add(const SlaveID& slaveID, const Resources& resources);
+  void subtract(const SlaveID& slaveID, const Resources& resources);
+
+  bool empty() const { return scalars.empty(); }
+  ResourceQuantities quantities() const { return scalarsTotal; }
+
+private:
+  hashmap<SlaveID, Resources> scalars;
+  ResourceQuantities scalarsTotal;
+};
+
+
 class Role
 {
 public:
@@ -122,19 +144,24 @@ public:
     return reservationScalarQuantities_;
   }
 
-  const Resources& offeredOrAllocatedScalars() const
+  ResourceQuantities offeredOrAllocatedReservedScalarQuantities() const
   {
-    return offeredOrAllocatedScalars_;
+    return offeredOrAllocatedReserved.quantities();
   }
 
   const hashset<FrameworkID>& frameworks() const { return frameworks_; }
 
   const Quota& quota() const { return quota_; }
 
+  ResourceQuantities quotaOfferedOrConsumed() const
+  {
+    return offeredOrAllocatedUnreservedNonRevocable.quantities() +
+           reservationScalarQuantities_;
+  }
+
   ResourceQuantities quotaConsumed() const
   {
-    return ResourceQuantities::fromScalarResources(
-               allocatedScalars_.unreserved().nonRevocable()) +
+    return allocatedUnreservedNonRevocable.quantities() +
            reservationScalarQuantities_;
   }
 
@@ -187,12 +214,13 @@ private:
   // resources offered or allocated to this role.
   hashset<FrameworkID> frameworks_;
 
-  // Total allocated or offered scalar resources to this role, including
-  // meta data. This field dose not affect role's lifecycle. However, since
-  // any offered or allocated resources should be tied to a framework,
-  // an empty role (that has no registered framework) must have
-  // empty offeredOrAllocated resources.
-  Resources offeredOrAllocatedScalars_;
+  // Totals tracker for unreserved non-revocable offered/allocated resources.
+  // Note that since any offered or allocated resources should be tied to
+  // a framework, an empty role (that has no registered framework) must have
+  // this total empty.
+  ScalarResourceTotals offeredOrAllocatedUnreservedNonRevocable;
+
+  ScalarResourceTotals offeredOrAllocatedReserved;
 
   // Aggregated reserved scalar resource quantities on all agents tied to this
   // role, if any. This includes both its own reservations as well as
@@ -200,9 +228,9 @@ private:
   // Note that non-scalar resources, such as ports, are excluded.
   ResourceQuantities reservationScalarQuantities_;
 
-  // Scalar resources actually allocated (i.e. used for launching tasks) to 
this
-  // role and any of its subroles, both reserved and unreserved, on all agents.
-  Resources allocatedScalars_;
+  // Totals tracker for unreserved non-revocable resources actually allocated
+  // (i.e. used for launching tasks) to this role and any of its subroles.
+  ScalarResourceTotals allocatedUnreservedNonRevocable;
 
   hashmap<std::string, Role*> children_;
 };
@@ -240,9 +268,9 @@ public:
   void trackReservations(const Resources& resources);
   void untrackReservations(const Resources& resources);
 
-  // We keep track of allocated resources which are actially used by 
frameworks.
-  void trackAllocated(const Resources& resources);
-  void untrackAllocated(const Resources& resources);
+  // We keep track of allocated resources which are actually used by 
frameworks.
+  void trackAllocated(const SlaveID& slaveId, const Resources& resources);
+  void untrackAllocated(const SlaveID& slaveId, const Resources& resources);
 
   void trackFramework(
     const FrameworkID& frameworkId, const std::string& role);
@@ -253,8 +281,13 @@ public:
 
   void updateWeight(const std::string& role, double weight);
 
-  void trackOfferedOrAllocated(const Resources& resources);
-  void untrackOfferedOrAllocated(const Resources& resources);
+  void trackOfferedOrAllocated(
+      const SlaveID& slaveId,
+      const Resources& resources);
+
+  void untrackOfferedOrAllocated(
+      const SlaveID& slaveId,
+      const Resources& resources);
 
   // Dump the role tree state in JSON format for debugging.
   std::string toJSON() const;

Reply via email to