Added GC of unreachable agent metadata from the registry.

Information about agents that have been marked unreachable is stored
in the registry. Since the number of unreachable agents can grow without
bound, this commit implements a garbage collection scheme. The current
leading master will periodically examine the registry and discard
information about unreachable agents according to two criteria:
`registry_max_agent_age` and `registry_max_agent_count`. The frequency
with which the master examines the registry is controlled by a third
parameter, `registry_gc_interval`.

Review: https://reviews.apache.org/r/51021/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9dad0b03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9dad0b03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9dad0b03

Branch: refs/heads/master
Commit: 9dad0b034c99403cab43cab0a6ea8a55e99e71fa
Parents: 592da1e
Author: Neil Conway <neil.con...@gmail.com>
Authored: Mon Sep 19 15:48:16 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 15:48:16 2016 -0700

----------------------------------------------------------------------
 src/master/constants.hpp      |   6 +
 src/master/flags.cpp          |  31 ++
 src/master/flags.hpp          |   3 +
 src/master/master.cpp         | 104 +++++++
 src/master/master.hpp         |  66 ++++-
 src/master/registry.proto     |   6 +
 src/tests/partition_tests.cpp | 590 +++++++++++++++++++++++++++++++++++++
 src/tests/registrar_tests.cpp |  46 +++
 8 files changed, 848 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/master/constants.hpp
----------------------------------------------------------------------
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index cd80dac..5dd0667 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -91,6 +91,12 @@ constexpr Duration WHITELIST_WATCH_INTERVAL = Seconds(5);
 // Default number of tasks (limit) for /master/tasks endpoint.
 constexpr size_t TASK_LIMIT = 100;
 
+constexpr Duration DEFAULT_REGISTRY_GC_INTERVAL = Minutes(15);
+
+constexpr Duration DEFAULT_REGISTRY_MAX_AGENT_AGE = Weeks(2);
+
+constexpr size_t DEFAULT_REGISTRY_MAX_AGENT_COUNT = 100 * 1024;
+
 /**
  * Label used by the Leader Contender and Detector.
  *

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/master/flags.cpp
----------------------------------------------------------------------
diff --git a/src/master/flags.cpp b/src/master/flags.cpp
index 19ae6c1..aea8bd4 100644
--- a/src/master/flags.cpp
+++ b/src/master/flags.cpp
@@ -557,4 +557,35 @@ mesos::internal::master::Flags::Flags()
       "should exist in a module specified through the --modules flag.\n"
       "Cannot be used in conjunction with --zk.\n"
       "Must be used in conjunction with --master_contender.");
+
+  add(&Flags::registry_gc_interval,
+      "registry_gc_interval",
+      "How often to garbage collect the registry. The current leading\n"
+      "master will periodically discard information from the registry.\n"
+      "How long registry state is retained is controlled by other\n"
+      "parameters (e.g., registry_max_agent_age, registry_max_agent_count);\n"
+      "this parameter controls how often the master will examine the\n"
+      "registry to see if data should be discarded.",
+      DEFAULT_REGISTRY_GC_INTERVAL);
+
+  add(&Flags::registry_max_agent_age,
+      "registry_max_agent_age",
+      "Maximum length of time to store information in the registry about\n"
+      "agents that are not currently connected to the cluster. This\n"
+      "information allows frameworks to determine the status of unreachable\n"
+      "and removed agents. Note that the registry always stores information\n"
+      "on all connected agents. If there are more than\n"
+      "`registry_max_agent_count` partitioned or removed agents, agent\n"
+      "information may be discarded from the registry sooner than indicated\n"
+      "by this parameter.",
+      DEFAULT_REGISTRY_MAX_AGENT_AGE);
+
+  add(&Flags::registry_max_agent_count,
+      "registry_max_agent_count",
+      "Maximum number of disconnected agents to store in the registry.\n"
+      "This informtion allows frameworks to determine the status of\n"
+      "disconnected agents. Note that the registry always stores\n"
+      "information about all connected agents. See also the\n"
+      "`registry_max_agent_age` flag.",
+      DEFAULT_REGISTRY_MAX_AGENT_COUNT);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index c6e8530..708a629 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -89,6 +89,9 @@ public:
   size_t max_completed_tasks_per_framework;
   Option<std::string> master_contender;
   Option<std::string> master_detector;
+  Duration registry_gc_interval;
+  Duration registry_max_agent_age;
+  size_t registry_max_agent_count;
 
 #ifdef WITH_NETWORK_ISOLATOR
   Option<size_t> max_executors_per_agent;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 71caea6..5089b99 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1201,6 +1201,10 @@ void Master::finalize()
     Clock::cancel(slaves.recoveredTimer.get());
   }
 
+  if (registryGcTimer.isSome()) {
+    Clock::cancel(registryGcTimer.get());
+  }
+
   terminate(whitelistWatcher);
   wait(whitelistWatcher);
   delete whitelistWatcher;
@@ -1574,6 +1578,9 @@ Future<Nothing> Master::_recover(const Registry& registry)
     slaves.unreachable[unreachable.id()] = unreachable.timestamp();
   }
 
+  // Set up a timer for age-based registry GC.
+  scheduleRegistryGc();
+
   // Set up a timeout for slaves to re-register.
   slaves.recoveredTimer =
     delay(flags.agent_reregister_timeout,
@@ -1675,6 +1682,103 @@ Future<Nothing> Master::_recover(const Registry& 
registry)
 }
 
 
+void Master::scheduleRegistryGc()
+{
+  registryGcTimer = delay(flags.registry_gc_interval,
+                          self(),
+                          &Self::doRegistryGc);
+}
+
+
+void Master::doRegistryGc()
+{
+  // Schedule next periodic GC.
+  scheduleRegistryGc();
+
+  // Determine which unreachable agents to GC from the registry, if
+  // any. We do this by examining the master's in-memory copy of the
+  // unreachable list and checking two criteria, "age" and "count". To
+  // check the "count" criteria, we remove elements from the beginning
+  // of the list until it contains at most "registry_max_agent_count"
+  // elements (note that `slaves.unreachable` is a `LinkedHashMap`,
+  // which provides iteration over keys in insertion-order). To check
+  // the "age" criteria, we remove any element in the list whose age
+  // is more than "registry_max_agent_age". Note that for the latter,
+  // we check the entire list, not just the beginning: this avoids
+  // requiring that the list be kept sorted by timestamp.
+  //
+  // We build a candidate list of SlaveIDs to remove. We then try to
+  // remove this list from the registry. Note that all the slaveIDs we
+  // want to remove might not be found in the registrar's copy of the
+  // unreachable list; this can occur if there is a concurrent write
+  // (e.g., an unreachable agent we want to GC reregisters
+  // concurrently). In this situation, we skip removing any elements
+  // we don't find.
+
+  size_t unreachableCount = slaves.unreachable.size();
+  TimeInfo currentTime = protobuf::getCurrentTime();
+  hashset<SlaveID> toRemove;
+
+  foreach (const SlaveID& slave, slaves.unreachable.keys()) {
+    // Count-based GC.
+    CHECK(toRemove.size() <= unreachableCount);
+
+    size_t liveCount = unreachableCount - toRemove.size();
+    if (liveCount > flags.registry_max_agent_count) {
+      toRemove.insert(slave);
+      continue;
+    }
+
+    // Age-based GC.
+    const TimeInfo& unreachableTime = slaves.unreachable[slave];
+    Duration age = Nanoseconds(
+        currentTime.nanoseconds() - unreachableTime.nanoseconds());
+
+    if (age > flags.registry_max_agent_age) {
+      toRemove.insert(slave);
+    }
+  }
+
+  if (toRemove.empty()) {
+    VLOG(1) << "Skipping periodic registry garbage collection: "
+            << "no agents qualify for removal";
+    return;
+  }
+
+  VLOG(1) << "Attempting to remove " << toRemove.size()
+          << " unreachable agents from the registry";
+
+  registrar->apply(Owned<Operation>(new PruneUnreachable(toRemove)))
+    .onAny(defer(self(),
+                 &Self::_doRegistryGc,
+                 toRemove,
+                 lambda::_1));
+}
+
+
+void Master::_doRegistryGc(
+    const hashset<SlaveID>& toRemove,
+    const Future<bool>& registrarResult)
+{
+  CHECK(!registrarResult.isDiscarded());
+  CHECK(!registrarResult.isFailed());
+
+  // `PruneUnreachable` registry operation should never fail.
+  CHECK(registrarResult.get());
+
+  // TODO(neilc): Add a metric for # of agents discarded from the registry?
+  LOG(INFO) << "Garbage collected " << toRemove.size()
+            << " unreachable agents from the registry";
+
+  // Update in-memory state to be consistent with registry changes.
+  foreach (const SlaveID& slave, toRemove) {
+    // NOTE: `slave` might not appear in `slaves.unreachable` if there
+    // have been concurrent changes.
+    slaves.unreachable.erase(slave);
+  }
+}
+
+
 void Master::recoveredSlavesTimeout(const Registry& registry)
 {
   CHECK(elected());

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 370e0f0..4ba8e53 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -56,6 +56,7 @@
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
+#include <stout/linkedhashmap.hpp>
 #include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
 #include <stout/recordio.hpp>
@@ -964,6 +965,14 @@ private:
     return leader.isSome() && leader.get() == info_;
   }
 
+  void scheduleRegistryGc();
+
+  void doRegistryGc();
+
+  void _doRegistryGc(
+      const hashset<SlaveID>& toRemove,
+      const process::Future<bool>& registrarResult);
+
   process::Future<bool> authorizeLogAccess(
       const Option<std::string>& principal);
 
@@ -1595,6 +1604,10 @@ private:
   // master is elected as a leader.
   Option<process::Future<Nothing>> recovered;
 
+  // If this is the leading master, we periodically check whether we
+  // should GC some information from the registry.
+  Option<process::Timer> registryGcTimer;
+
   struct Slaves
   {
     Slaves() : removed(MAX_REMOVED_SLAVES) {}
@@ -1696,10 +1709,13 @@ private:
     // TODO(bmahler): Ideally we could use a cache with set semantics.
     Cache<SlaveID, Nothing> removed;
 
-    // Slaves that have been marked unreachable. We recover this from the
-    // registry, so it includes slaves marked as unreachable by other
-    // instances of the master.
-    hashmap<SlaveID, TimeInfo> unreachable;
+    // Slaves that have been marked unreachable. We recover this from
+    // the registry, so it includes slaves marked as unreachable by
+    // other instances of the master. Note that we use a linkedhashmap
+    // to ensure the order of elements here matches the order in the
+    // registry's unreachable list, which matches the order in which
+    // agents are marked unreachable.
+    LinkedHashMap<SlaveID, TimeInfo> unreachable;
 
     // This rate limiter is used to limit the removal of slaves failing
     // health checks.
@@ -2051,6 +2067,48 @@ private:
 };
 
 
+class PruneUnreachable : public Operation
+{
+public:
+  explicit PruneUnreachable(const hashset<SlaveID>& _toRemove)
+    : toRemove(_toRemove) {}
+
+protected:
+  virtual Try<bool> perform(Registry* registry, hashset<SlaveID>*, bool)
+  {
+    // Attempt to remove the SlaveIDs in `toRemove` from the
+    // unreachable list. Some SlaveIDs in `toRemove` might not appear
+    // in the registry; this is possible if there was a concurrent
+    // registry operation.
+    //
+    // TODO(neilc): This has quadratic worst-case behavior, because
+    // `DeleteSubrange` for a `repeated` object takes linear time.
+    bool mutate = false;
+    int i = 0;
+    while (i < registry->unreachable().slaves().size()) {
+      const Registry::UnreachableSlave& slave =
+        registry->unreachable().slaves(i);
+
+      if (toRemove.contains(slave.id())) {
+        Registry::UnreachableSlaves* unreachable =
+          registry->mutable_unreachable();
+
+        unreachable->mutable_slaves()->DeleteSubrange(i, i+1);
+        mutate = true;
+        continue;
+      }
+
+      i++;
+    }
+
+    return mutate;
+  }
+
+private:
+  const hashset<SlaveID> toRemove;
+};
+
+
 // Implementation of slave removal Registrar operation.
 class RemoveSlave : public Operation
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
index 03c896c..eab9821 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -76,6 +76,12 @@ message Registry {
 
   // Slaves that have failed health checks. They may or may not still
   // be running.
+  //
+  // New entries are added to the end of this list; hence the first
+  // element of the list was added first (although if there is clock
+  // drift, it might not necessarily have the smallest timestamp). The
+  // size of this list is limited by the `registry_max_agent_age` and
+  // `registry_max_agent_count` flags.
   optional UnreachableSlaves unreachable = 7;
 
   // Holds a list of machines and some status information about each.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 997eda3..54d07b7 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -58,6 +58,7 @@ using process::Future;
 using process::Message;
 using process::Owned;
 using process::PID;
+using process::Time;
 
 using process::http::OK;
 using process::http::Response;
@@ -1279,6 +1280,595 @@ TEST_P(PartitionTest, PartitionedSlaveExitedExecutor)
 }
 
 
+// This test checks that the master correctly garbage collects
+// information about unreachable agents from the registry using the
+// count-based GC criterion.
+TEST_P(PartitionTest, RegistryGcByCount)
+{
+  // Configure GC to only keep the most recent partitioned agent in
+  // the unreachable list.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry_strict = GetParam();
+  masterFlags.registry_max_agent_count = 1;
+
+  // Test logic assumes that two agents can be marked unreachable in
+  // sequence without triggering GC.
+  const Duration health_check_duration =
+    masterFlags.agent_ping_timeout * masterFlags.max_agent_ping_timeouts;
+
+  CHECK(masterFlags.registry_gc_interval >= health_check_duration * 2);
+
+  // Pause the clock before starting the master. This ensures that we
+  // know precisely when the GC timer will fire.
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  // Drop all the PONGs to simulate slave partition.
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> slaveDetector1 = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave1 = StartSlave(slaveDetector1.get());
+  ASSERT_SOME(slave1);
+
+  AWAIT_READY(slaveRegisteredMessage1);
+  const SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id();
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<Nothing> resourceOffers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureSatisfy(&resourceOffers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Need to make sure the framework AND slave have registered with
+  // master. Waiting for resource offers should accomplish both.
+  AWAIT_READY(resourceOffers);
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return());
+
+  Future<Nothing> slaveLost1;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost1));
+
+  // Simulate the first slave becoming partitioned from the master.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unreachable. We then advance the clock -- this shouldn't
+  // do anything, but it ensures that the `unreachable_time` we check
+  // below is computed at the right time.
+  TimeInfo partitionTime1 = protobuf::getCurrentTime();
+
+  Clock::advance(Milliseconds(100));
+
+  AWAIT_READY(slaveLost1);
+
+  // Shutdown the first slave. This is necessary because we only drop
+  // PONG messages; after advancing the clock below, the slave would
+  // try to reregister and would succeed. Hence, stop the slave first.
+  slave1.get()->terminate();
+  slave1->reset();
+
+  // Start another slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> slaveDetector2 = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave2 = StartSlave(slaveDetector2.get());
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(slaveRegisteredMessage2);
+  const SlaveID slaveId2 = slaveRegisteredMessage2.get().slave_id();
+
+  Future<Nothing> slaveLost2;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost2));
+
+  // Simulate the second slave becoming partitioned from the master.
+  pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unreachable. We then advance the clock -- this shouldn't
+  // do anything, but it ensures that the `unreachable_time` we check
+  // below is computed at the right time.
+  TimeInfo partitionTime2 = protobuf::getCurrentTime();
+
+  Clock::advance(Milliseconds(100));
+
+  AWAIT_READY(slaveLost2);
+
+  // Shutdown the second slave. This is necessary because we only drop
+  // PONG messages; after advancing the clock below, the slave would
+  // try to reregister and would succeed. Hence, stop the slave first.
+  slave2.get()->terminate();
+  slave2->reset();
+
+  // Do explicit reconciliation for a random task ID on `slave1`. GC
+  // has not occurred yet (since `registry_gc_interval` has not
+  // elapsed since the master was started), so the slave should be in
+  // the unreachable list; hence `unreachable_time` should be set on
+  // the result of the reconciliation request.
+  TaskStatus status1;
+  status1.mutable_task_id()->set_value(UUID::random().toString());
+  status1.mutable_slave_id()->CopyFrom(slaveId1);
+  status1.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate1));
+
+  driver.reconcileTasks({status1});
+
+  AWAIT_READY(reconcileUpdate1);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
+  EXPECT_EQ(partitionTime1, reconcileUpdate1.get().unreachable_time());
+
+  // Advance the clock to cause GC to be performed.
+  Clock::advance(masterFlags.registry_gc_interval);
+  Clock::settle();
+
+  // Do explicit reconciliation for a random task ID on `slave1`.
+  // Because the agent has been removed from the unreachable list in
+  // the registry, `unreachable_time` should NOT be set.
+  TaskStatus status2;
+  status2.mutable_task_id()->set_value(UUID::random().toString());
+  status2.mutable_slave_id()->CopyFrom(slaveId1);
+  status2.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate2));
+
+  driver.reconcileTasks({status2});
+
+  AWAIT_READY(reconcileUpdate2);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate2.get().reason());
+  EXPECT_FALSE(reconcileUpdate2.get().has_unreachable_time());
+
+  // Do explicit reconciliation for a random task ID on the second
+  // partitioned slave. Because the agent is still in the unreachable
+  // list in the registry, `unreachable_time` should be set.
+  TaskStatus status3;
+  status3.mutable_task_id()->set_value(UUID::random().toString());
+  status3.mutable_slave_id()->CopyFrom(slaveId2);
+  status3.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate3;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate3));
+
+  driver.reconcileTasks({status3});
+
+  AWAIT_READY(reconcileUpdate3);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate3.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate3.get().reason());
+  EXPECT_EQ(partitionTime2, reconcileUpdate3.get().unreachable_time());
+
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(2, stats.values["master/slave_unreachable_scheduled"]);
+  EXPECT_EQ(2, stats.values["master/slave_unreachable_completed"]);
+  EXPECT_EQ(2, stats.values["master/slave_removals"]);
+  EXPECT_EQ(2, stats.values["master/slave_removals/reason_unhealthy"]);
+
+  driver.stop();
+  driver.join();
+
+  Clock::resume();
+}
+
+
+// This test ensures that when using the count-based criterion to
+// garbage collect unreachable agents from the registry, the agents
+// that were marked unreachable first are the ones that are
+// removed. This requires creating a large unreachable list, which
+// would be annoying to do by creating slaves and simulating network
+// partitions; instead we add agents to the unreachable list by
+// directly applying registry operations.
+TEST_P(PartitionTest, RegistryGcByCountManySlaves)
+{
+  // Configure GC to only keep the most recent partitioned agent in
+  // the unreachable list.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry_strict = GetParam();
+  masterFlags.registry_max_agent_count = 1;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Clock::pause();
+
+  TimeInfo unreachableTime = protobuf::getCurrentTime();
+
+  vector<SlaveID> slaveIDs;
+  for (int i = 0; i < 50; i++) {
+    SlaveID slaveID;
+    slaveID.set_value(UUID::random().toString());
+    slaveIDs.push_back(slaveID);
+
+    SlaveInfo slaveInfo;
+    slaveInfo.set_hostname("localhost");
+    slaveInfo.mutable_id()->CopyFrom(slaveID);
+
+    Future<bool> admitApply =
+      master.get()->registrar->unmocked_apply(
+          Owned<master::Operation>(
+              new master::AdmitSlave(slaveInfo)));
+
+    AWAIT_EXPECT_TRUE(admitApply);
+
+    Future<bool> unreachableApply =
+      master.get()->registrar->unmocked_apply(
+          Owned<master::Operation>(
+              new master::MarkSlaveUnreachable(slaveInfo, unreachableTime)));
+
+    AWAIT_EXPECT_TRUE(unreachableApply);
+  }
+
+  // Restart the master to recover from updated registry.
+  master->reset();
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Advance the clock to cause GC to be performed.
+  Clock::advance(masterFlags.registry_gc_interval);
+  Clock::settle();
+
+  // Start a scheduler: we verify GC behavior by doing reconciliation.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  // We expect that only the most-recently inserted SlaveID will have
+  // survived GC. We also check that the second-most-recent SlaveID
+  // has been GC'd.
+  SlaveID keptSlaveID = slaveIDs.back();
+  SlaveID removedSlaveID = *(slaveIDs.crbegin() + 1);
+
+  TaskStatus status1;
+  status1.mutable_task_id()->set_value(UUID::random().toString());
+  status1.mutable_slave_id()->CopyFrom(keptSlaveID);
+  status1.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate1));
+
+  driver.reconcileTasks({status1});
+
+  AWAIT_READY(reconcileUpdate1);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
+  EXPECT_EQ(unreachableTime, reconcileUpdate1.get().unreachable_time());
+
+  TaskStatus status2;
+  status2.mutable_task_id()->set_value(UUID::random().toString());
+  status2.mutable_slave_id()->CopyFrom(removedSlaveID);
+  status2.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate2));
+
+  driver.reconcileTasks({status2});
+
+  AWAIT_READY(reconcileUpdate2);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate2.get().reason());
+  EXPECT_FALSE(reconcileUpdate2.get().has_unreachable_time());
+
+  driver.stop();
+  driver.join();
+
+  Clock::resume();
+}
+
+
+// This test checks that the master correctly garbage collects
+// information about unreachable agents from the registry using the
+// age-based GC criterion. We configure GC to discard agents after 20
+// minutes; GC occurs every 15 mins. We test the following schedule:
+//
+// 75 sec:              slave1 is marked unreachable
+// 720 secs (12 mins):  slave2 is marked unreachable
+// 900 secs (15 mins):  GC runs, nothing discarded
+// 1800 secs (30 mins): GC runs, slave1 is discarded
+// 2700 secs (45 mins): GC runs, slave2 is discarded
+TEST_P(PartitionTest, RegistryGcByAge)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry_gc_interval = Minutes(15);
+  masterFlags.registry_max_agent_age = Minutes(20);
+  masterFlags.registry_strict = GetParam();
+
+  // Pause the clock before starting the master. This ensures that we
+  // know precisely when the GC timer will fire.
+  Clock::pause();
+
+  Time startTime = Clock::now();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Set these expectations up before we spawn the slave so that we
+  // don't miss the first PING.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  // Drop all the PONGs to simulate slave partition.
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> slaveDetector1 = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave1 = StartSlave(slaveDetector1.get());
+  ASSERT_SOME(slave1);
+
+  AWAIT_READY(slaveRegisteredMessage1);
+  const SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id();
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<Nothing> resourceOffers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureSatisfy(&resourceOffers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Need to make sure the framework AND slave have registered with
+  // master. Waiting for resource offers should accomplish both.
+  AWAIT_READY(resourceOffers);
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return());
+
+  Future<Nothing> slaveLost1;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost1));
+
+  // Simulate the first slave becoming partitioned from the master.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unreachable.
+  TimeInfo partitionTime1 = protobuf::getCurrentTime();
+
+  AWAIT_READY(slaveLost1);
+
+  // Shutdown the first slave. This is necessary because we only drop
+  // PONG messages; after advancing the clock below, the slave would
+  // try to reregister and would succeed. Hence, stop the slave first.
+  slave1.get()->terminate();
+  slave1->reset();
+
+  // Per the schedule above, we want the second slave to be
+  // partitioned after 720 seconds have elapsed, so we advance the
+  // clock by 570 seconds (570 + 75 + 75 = 720).
+  EXPECT_EQ(Seconds(75), Clock::now() - startTime);
+
+  Clock::advance(Seconds(570));
+
+  // Start another slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> slaveDetector2 = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave2 = StartSlave(slaveDetector2.get());
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(slaveRegisteredMessage2);
+  const SlaveID slaveId2 = slaveRegisteredMessage2.get().slave_id();
+
+  Future<Nothing> slaveLost2;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost2));
+
+  // Simulate the second slave becoming partitioned from the master.
+  pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Record the time at which we expect the master to have marked the
+  // agent as unreachable.
+  TimeInfo partitionTime2 = protobuf::getCurrentTime();
+
+  AWAIT_READY(slaveLost2);
+
+  // Shutdown the second slave. This is necessary because we only drop
+  // PONG messages; after advancing the clock below, the slave would
+  // try to reregister and would succeed. Hence, stop the slave first.
+  slave2.get()->terminate();
+  slave2->reset();
+
+  EXPECT_EQ(Seconds(720), Clock::now() - startTime);
+
+  // Advance the clock to trigger a GC. The first GC occurs at 900
+  // elapsed seconds, so we advance by 180 seconds.
+  Clock::advance(Seconds(180));
+  Clock::settle();
+
+  // Do explicit reconciliation for random task IDs on both slaves.
+  // Since neither slave has exceeded the age-based GC bound, we
+  // expect to find both slaves (i.e., `unreachable_time` will be set).
+  TaskStatus status1;
+  status1.mutable_task_id()->set_value(UUID::random().toString());
+  status1.mutable_slave_id()->CopyFrom(slaveId1);
+  status1.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate1));
+
+  driver.reconcileTasks({status1});
+
+  AWAIT_READY(reconcileUpdate1);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
+  EXPECT_EQ(partitionTime1, reconcileUpdate1.get().unreachable_time());
+
+  TaskStatus status2;
+  status2.mutable_task_id()->set_value(UUID::random().toString());
+  status2.mutable_slave_id()->CopyFrom(slaveId2);
+  status2.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate2));
+
+  driver.reconcileTasks({status2});
+
+  AWAIT_READY(reconcileUpdate2);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate2.get().reason());
+  EXPECT_EQ(partitionTime2, reconcileUpdate2.get().unreachable_time());
+
+  // Advance the clock to cause GC to be performed.
+  Clock::advance(Minutes(15));
+  Clock::settle();
+
+  // Do explicit reconciliation for random task IDs on both slaves.
+  // We expect `slave1` to have been garbage collected, but `slave2`
+  // should still be present in the registry.
+  TaskStatus status3;
+  status3.mutable_task_id()->set_value(UUID::random().toString());
+  status3.mutable_slave_id()->CopyFrom(slaveId1);
+  status3.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate3;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate3));
+
+  driver.reconcileTasks({status3});
+
+  AWAIT_READY(reconcileUpdate3);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate3.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate3.get().reason());
+  EXPECT_FALSE(reconcileUpdate3.get().has_unreachable_time());
+
+  TaskStatus status4;
+  status4.mutable_task_id()->set_value(UUID::random().toString());
+  status4.mutable_slave_id()->CopyFrom(slaveId2);
+  status4.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate4;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate4));
+
+  driver.reconcileTasks({status4});
+
+  AWAIT_READY(reconcileUpdate4);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate4.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate4.get().reason());
+  EXPECT_EQ(partitionTime2, reconcileUpdate4.get().unreachable_time());
+
+  // Advance the clock to cause GC to be performed.
+  Clock::advance(Minutes(15));
+  Clock::settle();
+
+  // Do explicit reconciliation for a random task ID on `slave2`. We
+  // expect that it has been garbage collected, which means
+  // `unreachable_time` will not be set.
+  TaskStatus status5;
+  status5.mutable_task_id()->set_value(UUID::random().toString());
+  status5.mutable_slave_id()->CopyFrom(slaveId2);
+  status5.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate5;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate5));
+
+  driver.reconcileTasks({status5});
+
+  AWAIT_READY(reconcileUpdate5);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate5.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate5.get().reason());
+  EXPECT_FALSE(reconcileUpdate5.get().has_unreachable_time());
+
+  driver.stop();
+  driver.join();
+
+  Clock::resume();
+}
+
+
 class OneWayPartitionTest : public MesosTest {};
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9dad0b03/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 7ea00df..745cded 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -358,6 +358,52 @@ TEST_P(RegistrarTest, MarkUnreachable)
 }
 
 
+TEST_P(RegistrarTest, PruneUnreachable)
+{
+  Registrar registrar(flags, state);
+  AWAIT_READY(registrar.recover(master));
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+
+  SlaveID id1;
+  id1.set_value("1");
+  info1.mutable_id()->CopyFrom(id1);
+
+  SlaveID id2;
+  id2.set_value("2");
+
+  SlaveInfo info2;
+  info2.set_hostname("localhost");
+  info2.mutable_id()->CopyFrom(id2);
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info2))));
+
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info2, protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new PruneUnreachable({id1}))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new PruneUnreachable({id2}))));
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
+
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new PruneUnreachable({id1}))));
+}
+
+
 TEST_P(RegistrarTest, Remove)
 {
   Registrar registrar(flags, state);

Reply via email to