Repository: mesos
Updated Branches:
  refs/heads/master 34e1240d5 -> 29236068f


Changed master to allow partitioned slaves to reregister.

The previous behavior was to shutdown partitioned agents that attempt to
reregister---unless the master has failed over, in which case the
reregistration is allowed (when running in "non-strict" mode).

The new behavior is always to allow partitioned agents to reregister.
This is part of a longer-term project to allow frameworks to define
their own policies for handling tasks running on partitioned agents.

In particular, if a framework has the PARTITION_AWARE capability, any
tasks running on the partitioned agent will continue to run after
reregistration. If the framework is not PARTITION_AWARE, any tasks that
were running on such an agent will be killed after the agent reregisters
(unless the master has failed over). This is for backward compatibility
with the previous ("non-strict") behavior. Note that regardless of the
PARTITION_AWARE capability, the agent will not be shutdown, which is a
change from the previous Mesos behavior.

This commit also changes the master so that if an agent is removed and
then the master receives a message from that agent, the master will no
longer attempt to shutdown the agent. This is consistent with the goal
of getting the master out of the business of shutting down agents that
we suspect are unhealthy. Such an agent will eventually realize it is
not registered with the master (e.g., because it won't receive any pings
from the master), which will cause it to reregister.

Review: https://reviews.apache.org/r/50705/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/937c85f2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/937c85f2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/937c85f2

Branch: refs/heads/master
Commit: 937c85f2f6528d1ac56ea9a7aa174ca0bd371d0c
Parents: 34e1240
Author: Neil Conway <neil.con...@gmail.com>
Authored: Mon Sep 19 15:47:01 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 15:47:01 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp         | 471 ++++++++++++++++++++++++-------------
 src/master/master.hpp         |  64 ++---
 src/tests/master_tests.cpp    |  16 +-
 src/tests/partition_tests.cpp | 460 ++++++++++++++++++++++++------------
 4 files changed, 642 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/937c85f2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b88472f..7e501ec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -190,10 +190,10 @@ protected:
     timeouts = 0;
     pinged = false;
 
-    // Cancel any pending shutdown.
-    if (shuttingDown.isSome()) {
+    // Cancel any pending unreachable transitions.
+    if (markingUnreachable.isSome()) {
       // Need a copy for non-const access.
-      Future<Nothing> future = shuttingDown.get();
+      Future<Nothing> future = markingUnreachable.get();
       future.discard();
     }
   }
@@ -205,64 +205,64 @@ protected:
       if (timeouts >= maxSlavePingTimeouts) {
         // No pong has been received for the last
         // 'maxSlavePingTimeouts' pings.
-        shutdown();
+        markUnreachable();
       }
     }
 
-    // NOTE: We keep pinging even if we schedule a shutdown. This is
-    // because if the slave eventually responds to a ping, we can
-    // cancel the shutdown.
+    // NOTE: We keep pinging even if we schedule a transition to
+    // UNREACHABLE. This is because if the slave eventually responds
+    // to a ping, we can cancel the UNREACHABLE transition.
     ping();
   }
 
-  // NOTE: The shutdown of the slave is rate limited and can be
-  // canceled if a pong was received before the actual shutdown is
-  // called.
-  void shutdown()
+  // Marking slaves unreachable is rate-limited and can be canceled if
+  // a pong is received before `_markUnreachable` is called.
+  //
+  // TODO(neilc): Using a rate-limit when marking slaves unreachable
+  // is only necessary for frameworks that are not PARTITION_AWARE.
+  // For such frameworks, we shutdown their tasks when an unreachable
+  // agent reregisters, so a rate-limit is a useful safety
+  // precaution. Once all frameworks are PARTITION_AWARE, we can
+  // likely remove the rate-limit (MESOS-5948).
+  void markUnreachable()
   {
-    if (shuttingDown.isSome()) {
-      return;  // Shutdown is already in progress.
+    if (markingUnreachable.isSome()) {
+      return; // Unreachable transition is already in progress.
     }
 
     Future<Nothing> acquire = Nothing();
 
     if (limiter.isSome()) {
-      LOG(INFO) << "Scheduling shutdown of agent " << slaveId
-                << " due to health check timeout";
+      LOG(INFO) << "Scheduling transition of agent " << slaveId
+                << " to UNREACHABLE because of health check timeout";
 
       acquire = limiter.get()->acquire();
     }
 
-    shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown));
+    markingUnreachable = acquire.onAny(defer(self(), &Self::_markUnreachable));
     ++metrics->slave_unreachable_scheduled;
   }
 
-  void _shutdown()
+  void _markUnreachable()
   {
-    CHECK_SOME(shuttingDown);
+    CHECK_SOME(markingUnreachable);
 
-    const Future<Nothing>& future = shuttingDown.get();
+    const Future<Nothing>& future = markingUnreachable.get();
 
     CHECK(!future.isFailed());
 
     if (future.isReady()) {
-      LOG(INFO) << "Shutting down agent " << slaveId
-                << " due to health check timeout";
-
       ++metrics->slave_unreachable_completed;
 
-      dispatch(master,
-               &Master::shutdownSlave,
-               slaveId,
-               "health check timed out");
+      dispatch(master, &Master::markUnreachable, slaveId);
     } else if (future.isDiscarded()) {
-      LOG(INFO) << "Canceling shutdown of agent " << slaveId
-                << " since a pong is received!";
+      LOG(INFO) << "Canceling transition of agent " << slaveId
+                << " to UNREACHABLE because a pong was received!";
 
       ++metrics->slave_unreachable_canceled;
     }
 
-    shuttingDown = None();
+    markingUnreachable = None();
   }
 
 private:
@@ -272,7 +272,7 @@ private:
   const PID<Master> master;
   const Option<shared_ptr<RateLimiter>> limiter;
   shared_ptr<Metrics> metrics;
-  Option<Future<Nothing>> shuttingDown;
+  Option<Future<Nothing>> markingUnreachable;
   const Duration slavePingTimeout;
   const size_t maxSlavePingTimeouts;
   uint32_t timeouts;
@@ -1725,7 +1725,7 @@ void Master::recoveredSlavesTimeout(const Registry& 
registry)
     const string failure = "Agent removal rate limit acquisition failed";
 
     acquire
-      .then(defer(self(), &Self::removeSlave, slave))
+      .then(defer(self(), &Self::markUnreachableAfterFailover, slave))
       .onFailed(lambda::bind(fail, failure, lambda::_1))
       .onDiscarded(lambda::bind(fail, failure, "discarded"));
 
@@ -1734,13 +1734,13 @@ void Master::recoveredSlavesTimeout(const Registry& 
registry)
 }
 
 
-Nothing Master::removeSlave(const Registry::Slave& slave)
+Nothing Master::markUnreachableAfterFailover(const Registry::Slave& slave)
 {
   // The slave is removed from 'recovered' when it re-registers.
   if (!slaves.recovered.contains(slave.info().id())) {
-    LOG(INFO) << "Canceling removal of agent "
+    LOG(INFO) << "Canceling transition of agent "
               << slave.info().id() << " (" << slave.info().hostname() << ")"
-              << " since it re-registered!";
+              << " to unreachable because it re-registered";
 
     ++metrics->slave_unreachable_canceled;
     return Nothing();
@@ -1749,7 +1749,7 @@ Nothing Master::removeSlave(const Registry::Slave& slave)
   LOG(WARNING) << "Agent " << slave.info().id()
                << " (" << slave.info().hostname() << ") did not re-register"
                << " within " << flags.agent_reregister_timeout
-               << " after master failover; removing it from the registrar";
+               << " after master failover; marking it unreachable";
 
   ++metrics->slave_unreachable_completed;
   ++metrics->recovery_slave_removals;
@@ -1757,25 +1757,24 @@ Nothing Master::removeSlave(const Registry::Slave& 
slave)
   slaves.recovered.erase(slave.info().id());
 
   if (flags.registry_strict) {
-    slaves.removing.insert(slave.info().id());
+    slaves.markingUnreachable.insert(slave.info().id());
 
-    registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+    registrar->apply(Owned<Operation>(new MarkSlaveUnreachable(slave.info())))
       .onAny(defer(self(),
-                   &Self::_removeSlave,
+                   &Self::_markUnreachable,
                    slave.info(),
                    vector<StatusUpdate>(), // No TASK_LOST updates to send.
                    lambda::_1,
-                   "did not re-register after master failover",
-                   metrics->slave_removals_reason_unhealthy));
+                   "did not re-register after master failover"));
   } else {
     // When a non-strict registry is in use, we want to ensure the
     // registry is used in a write-only manner. Therefore we remove
     // the slave from the registry but we do not inform the
     // framework.
     const string& message =
-      "Failed to remove agent " + stringify(slave.info().id());
+      "Failed to mark agent " + stringify(slave.info().id()) + " unreachable";
 
-    registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+    registrar->apply(Owned<Operation>(new MarkSlaveUnreachable(slave.info())))
       .onFailed(lambda::bind(fail, message, lambda::_1));
   }
 
@@ -4688,18 +4687,14 @@ void Master::executorMessage(
   metrics->messages_executor_to_framework++;
 
   if (slaves.removed.get(slaveId).isSome()) {
-    // If the slave is removed, we have already informed
-    // frameworks that its tasks were LOST, so the slave
-    // should shut down.
+    // If the slave has been removed, drop the executor message. The
+    // master is no longer trying to health check this slave; when the
+    // slave realizes it hasn't received any pings from the master, it
+    // will eventually try to reregister.
     LOG(WARNING) << "Ignoring executor message"
                  << " from executor" << " '" << executorId << "'"
                  << " of framework " << frameworkId
-                 << " on removed agent " << slaveId
-                 << " ; asking agent to shutdown";
-
-    ShutdownMessage message;
-    message.set_message("Executor message from unknown agent");
-    reply(message);
+                 << " on removed agent " << slaveId;
     metrics->invalid_executor_to_framework_messages++;
     return;
   }
@@ -5019,22 +5014,6 @@ void Master::reregisterSlave(
     return;
   }
 
-  if (slaves.removed.get(slaveInfo.id()).isSome()) {
-    // To compensate for the case where a non-strict registrar is
-    // being used, we explicitly deny removed slaves from
-    // re-registering. This is because a non-strict registrar cannot
-    // enforce this. We've already told frameworks the tasks were
-    // lost so it's important to deny the slave from re-registering.
-    LOG(WARNING) << "Agent " << slaveInfo.id() << " at " << from
-                 << " (" << slaveInfo.hostname() << ") attempted to "
-                 << "re-register after removal; shutting it down";
-
-    ShutdownMessage message;
-    message.set_message("Agent attempted to re-register after removal");
-    send(from, message);
-    return;
-  }
-
   Slave* slave = slaves.registered.get(slaveInfo.id());
 
   if (slave != nullptr) {
@@ -5126,10 +5105,13 @@ void Master::reregisterSlave(
 
   slaves.reregistering.insert(slaveInfo.id());
 
-  // This handles the case when the slave tries to re-register with
-  // a failed over master, in which case we must consult the
-  // registrar.
-  registrar->apply(Owned<Operation>(new ReadmitSlave(slaveInfo)))
+  // Consult the registry to determine whether to readmit the
+  // slave. In the common case, the slave has been marked unreachable
+  // by the master, so we move the slave to the reachable list and
+  // readmit it. If the slave isn't in the unreachable list (which
+  // might occur if the slave's entry in the unreachable list is
+  // GC'd), we admit the slave anyway.
+  registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo)))
     .onAny(defer(self(),
                  &Self::_reregisterSlave,
                  slaveInfo,
@@ -5158,60 +5140,85 @@ void Master::_reregisterSlave(
   CHECK(slaves.reregistering.contains(slaveInfo.id()));
   slaves.reregistering.erase(slaveInfo.id());
 
-  CHECK(!readmit.isDiscarded());
-
   if (readmit.isFailed()) {
     LOG(FATAL) << "Failed to readmit agent " << slaveInfo.id() << " at " << pid
                << " (" << slaveInfo.hostname() << "): " << readmit.failure();
-  } else if (!readmit.get()) {
-    LOG(WARNING) << "The agent " << slaveInfo.id() << " at "
-                 << pid << " (" << slaveInfo.hostname() << ") could not be"
-                 << " readmitted; shutting it down";
-    slaves.removed.put(slaveInfo.id(), Nothing());
+  }
 
-    ShutdownMessage message;
-    message.set_message(
-        "Agent attempted to re-register with unknown agent id " +
-        stringify(slaveInfo.id()));
-    send(pid, message);
-  } else {
-    // Re-admission succeeded.
-    MachineID machineId;
-    machineId.set_hostname(slaveInfo.hostname());
-    machineId.set_ip(stringify(pid.address.ip));
+  CHECK(!readmit.isDiscarded());
 
-    Slave* slave = new Slave(
-        this,
-        slaveInfo,
-        pid,
-        machineId,
-        version,
-        Clock::now(),
-        checkpointedResources,
-        executorInfos,
-        tasks);
+  // `MarkSlaveReachable` registry operation should never fail.
+  CHECK(readmit.get());
 
-    slave->reregisteredTime = Clock::now();
+  // Re-admission succeeded.
+  MachineID machineId;
+  machineId.set_hostname(slaveInfo.hostname());
+  machineId.set_ip(stringify(pid.address.ip));
 
-    ++metrics->slave_reregistrations;
+  Slave* slave = new Slave(
+      this,
+      slaveInfo,
+      pid,
+      machineId,
+      version,
+      Clock::now(),
+      checkpointedResources,
+      executorInfos,
+      tasks);
 
-    addSlave(slave, completedFrameworks);
+  slave->reregisteredTime = Clock::now();
 
-    Duration pingTimeout =
-      flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
-    MasterSlaveConnection connection;
-    connection.set_total_ping_timeout_seconds(pingTimeout.secs());
+  ++metrics->slave_reregistrations;
 
-    SlaveReregisteredMessage message;
-    message.mutable_slave_id()->CopyFrom(slave->id);
-    message.mutable_connection()->CopyFrom(connection);
-    send(slave->pid, message);
+  // Check whether this master was the one that removed the
+  // reregistering agent from the cluster originally. This is false
+  // if the master has failed over since the agent was removed, for
+  // example.
+  bool slaveWasRemoved = slaves.removed.get(slave->id).isSome();
 
-    LOG(INFO) << "Re-registered agent " << *slave
-              << " with " << slave->info.resources();
+  addSlave(slave, completedFrameworks);
 
-    __reregisterSlave(slave, tasks, frameworks);
+  Duration pingTimeout =
+    flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
+  MasterSlaveConnection connection;
+  connection.set_total_ping_timeout_seconds(pingTimeout.secs());
+
+  SlaveReregisteredMessage message;
+  message.mutable_slave_id()->CopyFrom(slave->id);
+  message.mutable_connection()->CopyFrom(connection);
+  send(slave->pid, message);
+
+  LOG(INFO) << "Re-registered agent " << *slave
+            << " with " << slave->info.resources();
+
+  // Shutdown any frameworks running on the slave that don't have the
+  // PARTITION_AWARE capability, provided that this instance of the
+  // master previously added the slave to the `slaves.removed`
+  // collection. This matches the Mesos 1.0 "non-strict" semantics for
+  // frameworks that are not partition-aware: when a previously
+  // unreachable slave reregisters, its tasks are only shutdown if the
+  // master has not failed over.
+  if (slaveWasRemoved) {
+    foreach (const FrameworkInfo& framework, frameworks) {
+      if (!protobuf::frameworkHasCapability(
+              framework, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        LOG(INFO) << "Shutting down framework " << framework.id()
+                  << " at reregistered agent " << *slave
+                  << " because the framework is not partition-aware";
+
+        ShutdownFrameworkMessage message;
+        message.mutable_framework_id()->MergeFrom(framework.id());
+        send(slave->pid, message);
+
+        // Remove the framework's tasks from the master's in-memory state.
+        foreachvalue (Task* task, utils::copy(slave->tasks[framework.id()])) {
+          removeTask(task);
+        }
+      }
+    }
   }
+
+  __reregisterSlave(slave, tasks, frameworks);
 }
 
 
@@ -5314,17 +5321,13 @@ void Master::updateSlave(
   ++metrics->messages_update_slave;
 
   if (slaves.removed.get(slaveId).isSome()) {
-    // If the slave is removed, we have already informed
-    // frameworks that its tasks were LOST, so the slave should
-    // shut down.
+    // If the slave has been removed, drop the status update. The
+    // master is no longer trying to health check this slave; when the
+    // slave realizes it hasn't received any pings from the master, it
+    // will eventually try to reregister.
     LOG(WARNING)
       << "Ignoring update of agent with total oversubscribed resources "
-      << oversubscribedResources << " on removed agent " << slaveId
-      << " ; asking agent to shutdown";
-
-    ShutdownMessage message;
-    message.set_message("Update agent message from unknown agent");
-    reply(message);
+      << oversubscribedResources << " on removed agent " << slaveId;
     return;
   }
 
@@ -5446,18 +5449,13 @@ void Master::statusUpdate(StatusUpdate update, const 
UPID& pid)
   ++metrics->messages_status_update;
 
   if (slaves.removed.get(update.slave_id()).isSome()) {
-    // If the slave is removed, we have already informed
-    // frameworks that its tasks were LOST, so the slave should
-    // shut down.
+    // If the slave has been removed, drop the status update. The
+    // master is no longer trying to health check this slave; when the
+    // slave realizes it hasn't received any pings from the master, it
+    // will eventually try to reregister.
     LOG(WARNING) << "Ignoring status update " << update
                  << " from removed agent " << pid
-                 << " with id " << update.slave_id() << " ; asking agent "
-                 << " to shutdown";
-
-    ShutdownMessage message;
-    message.set_message("Status update from unknown agent");
-    send(pid, message);
-
+                 << " with id " << update.slave_id();
     metrics->invalid_status_updates++;
     return;
   }
@@ -5566,17 +5564,13 @@ void Master::exitedExecutor(
   ++metrics->messages_exited_executor;
 
   if (slaves.removed.get(slaveId).isSome()) {
-    // If the slave is removed, we have already informed
-    // frameworks that its tasks were LOST, so the slave should
-    // shut down.
+    // If the slave has been removed, drop the executor message. The
+    // master is no longer trying to health check this slave; when the
+    // slave realizes it hasn't received any pings from the master, it
+    // will eventually try to reregister.
     LOG(WARNING) << "Ignoring exited executor '" << executorId
                  << "' of framework " << frameworkId
-                 << " on removed agent " << slaveId
-                 << " ; asking agent to shutdown";
-
-    ShutdownMessage message;
-    message.set_message("Executor exited message from unknown agent");
-    reply(message);
+                 << " on removed agent " << slaveId;
     return;
   }
 
@@ -5653,27 +5647,182 @@ void Master::shutdown(
 }
 
 
-void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
+// TODO(neilc): Refactor to reduce code duplication with
+// `Master::removeSlave`.
+void Master::markUnreachable(const SlaveID& slaveId)
 {
-  if (!slaves.registered.contains(slaveId)) {
-    // Possible when the SlaveObserver dispatches a message to
-    // shutdown a slave but the slave is concurrently removed for
-    // another reason (e.g., `UnregisterSlaveMessage` is received).
-    LOG(WARNING) << "Unable to shutdown unknown agent " << slaveId;
+  Slave* slave = slaves.registered.get(slaveId);
+
+  if (slave == nullptr) {
+    // Possible when the `SlaveObserver` dispatches a message to mark an
+    // unhealthy slave as unreachable, but the slave is concurrently
+    // removed for another reason (e.g., `UnregisterSlaveMessage` is
+    // received).
+    LOG(WARNING) << "Unable to mark unknown agent "
+                 << slaveId << " unreachable";
     return;
   }
 
-  Slave* slave = slaves.registered.get(slaveId);
-  CHECK_NOTNULL(slave);
+  LOG(INFO) << "Marking agent " << *slave
+            << " unreachable: health check timed out";
 
-  LOG(WARNING) << "Shutting down agent " << *slave << " with message '"
-               << message << "'";
+  // We want to remove the slave first, to avoid the allocator
+  // re-allocating the recovered resources.
+  //
+  // NOTE: Removing the slave is not sufficient for recovering the
+  // resources in the allocator, because the "Sorters" are updated
+  // only within recoverResources() (see MESOS-621). The calls to
+  // recoverResources() below are therefore required, even though
+  // the slave is already removed.
+  allocator->removeSlave(slave->id);
 
-  ShutdownMessage message_;
-  message_.set_message(message);
-  send(slave->pid, message_);
+  // Transition the tasks to TASK_LOST and remove them, BUT do not
+  // send updates yet. Rather, build up the updates so that we can can
+  // send them after the slave has been moved to the unreachable list
+  // in the registry.
+  // TODO(neilc): Update this to send TASK_UNREACHABLE for
+  // partition-aware frameworks.
+  vector<StatusUpdate> updates;
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+    Framework* framework = getFramework(frameworkId);
+    CHECK_NOTNULL(framework);
+
+    foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          task->framework_id(),
+          task->slave_id(),
+          task->task_id(),
+          TASK_LOST,
+          TaskStatus::SOURCE_MASTER,
+          None(),
+          "Slave " + slave->info.hostname() + " is unreachable",
+          TaskStatus::REASON_SLAVE_REMOVED,
+          (task->has_executor_id() ?
+              Option<ExecutorID>(task->executor_id()) : None()));
+
+      updateTask(task, update);
+      removeTask(task);
+
+      updates.push_back(update);
+    }
+  }
+
+  // Remove executors from the slave for proper resource accounting.
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
+    foreachkey (const ExecutorID& executorId,
+                utils::copy(slave->executors[frameworkId])) {
+      removeExecutor(slave, frameworkId, executorId);
+    }
+  }
+
+  foreach (Offer* offer, utils::copy(slave->offers)) {
+    // TODO(vinod): We don't need to call 'Allocator::recoverResources'
+    // once MESOS-621 is fixed.
+    allocator->recoverResources(
+        offer->framework_id(), slave->id, offer->resources(), None());
+
+    // Remove and rescind offers.
+    removeOffer(offer, true); // Rescind!
+  }
+
+  // Remove inverse offers because sending them for a slave that is
+  // unreachable doesn't make sense.
+  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+    // We don't need to update the allocator because we've already called
+    // `RemoveSlave()`.
+    // Remove and rescind inverse offers.
+    removeInverseOffer(inverseOffer, true); // Rescind!
+  }
+
+  // Mark the slave as being unreachable.
+  slaves.registered.remove(slave);
+  slaves.removed.put(slave->id, Nothing());
+  slaves.markingUnreachable.insert(slave->id);
+  authenticated.erase(slave->pid);
+
+  // Remove the slave from the `machines` mapping.
+  CHECK(machines.contains(slave->machineId));
+  CHECK(machines[slave->machineId].slaves.contains(slave->id));
+  machines[slave->machineId].slaves.erase(slave->id);
+
+  // Kill the slave observer.
+  terminate(slave->observer);
+  wait(slave->observer);
+  delete slave->observer;
+
+  // TODO(benh): unlink(slave->pid);
 
-  removeSlave(slave, message, metrics->slave_removals_reason_unhealthy);
+  // Update the registry to move this slave from the list of admitted
+  // slaves to the list of unreachable slaves. Once this is completed,
+  // we can forward the TASK_LOST updates to the frameworks.
+  registrar->apply(Owned<Operation>(new MarkSlaveUnreachable(slave->info)))
+    .onAny(defer(self(),
+                 &Self::_markUnreachable,
+                 slave->info,
+                 updates,
+                 lambda::_1,
+                 "health check timed out"));
+
+  delete slave;
+}
+
+
+void Master::_markUnreachable(
+    const SlaveInfo& slaveInfo,
+    const vector<StatusUpdate>& updates,
+    const Future<bool>& registrarResult,
+    const string& unreachableCause)
+{
+  CHECK(slaves.markingUnreachable.contains(slaveInfo.id()));
+  slaves.markingUnreachable.erase(slaveInfo.id());
+
+  if (registrarResult.isFailed()) {
+    LOG(FATAL) << "Failed to mark agent " << slaveInfo.id()
+               << " (" << slaveInfo.hostname() << ")"
+               << " unreachable in the registry: "
+               << registrarResult.failure();
+  }
+
+  CHECK(!registrarResult.isDiscarded());
+
+  // `MarkSlaveUnreachable` registry operation should never fail.
+  CHECK(registrarResult.get());
+
+  LOG(INFO) << "Marked agent " << slaveInfo.id() << " ("
+            << slaveInfo.hostname() << ") unreachable: "
+            << unreachableCause;
+
+  // TODO(neilc): Consider renaming these metrics or adding new
+  // metrics for the new PARTITION_AWARE semantics.
+  ++metrics->slave_removals;
+  ++metrics->slave_removals_reason_unhealthy;
+
+  // Forward the TASK_LOST updates on to the frameworks.
+  foreach (const StatusUpdate& update, updates) {
+    Framework* framework = getFramework(update.framework_id());
+
+    if (framework == nullptr) {
+      LOG(WARNING) << "Dropping update " << update << " from unknown framework 
"
+                   << update.framework_id();
+    } else {
+      forward(update, UPID(), framework);
+    }
+  }
+
+  // Notify all frameworks of the lost slave.
+  foreachvalue (Framework* framework, frameworks.registered) {
+    LOG(INFO) << "Notifying framework " << *framework << " of lost agent "
+              << slaveInfo.id() << " (" << slaveInfo.hostname() << ") "
+              << "after recovering";
+    LostSlaveMessage message;
+    message.mutable_slave_id()->MergeFrom(slaveInfo.id());
+    framework->send(message);
+  }
+
+  // Finally, notify the `SlaveLost` hooks.
+  if (HookManager::hooksAvailable()) {
+    HookManager::masterSlaveLostHook(slaveInfo);
+  }
 }
 
 
@@ -7057,27 +7206,27 @@ void Master::removeSlave(
 void Master::_removeSlave(
     const SlaveInfo& slaveInfo,
     const vector<StatusUpdate>& updates,
-    const Future<bool>& removed,
-    const string& message,
+    const Future<bool>& registrarResult,
+    const string& removalCause,
     Option<Counter> reason)
 {
   CHECK(slaves.removing.contains(slaveInfo.id()));
   slaves.removing.erase(slaveInfo.id());
 
-  CHECK(!removed.isDiscarded());
+  CHECK(!registrarResult.isDiscarded());
 
-  if (removed.isFailed()) {
+  if (registrarResult.isFailed()) {
     LOG(FATAL) << "Failed to remove agent " << slaveInfo.id()
                << " (" << slaveInfo.hostname() << ")"
-               << " from the registrar: " << removed.failure();
+               << " from the registrar: " << registrarResult.failure();
   }
 
-  CHECK(removed.get())
+  CHECK(registrarResult.get())
     << "Agent " << slaveInfo.id() << " (" << slaveInfo.hostname() << ") "
     << "already removed from the registrar";
 
   LOG(INFO) << "Removed agent " << slaveInfo.id() << " ("
-            << slaveInfo.hostname() << "): " << message;
+            << slaveInfo.hostname() << "): " << removalCause;
 
   ++metrics->slave_removals;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/937c85f2/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 4992ab0..b092de4 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -522,9 +522,7 @@ public:
       const MachineID& machineId,
       const Option<Unavailability>& unavailability);
 
-  void shutdownSlave(
-      const SlaveID& slaveId,
-      const std::string& message);
+  void markUnreachable(const SlaveID& slaveId);
 
   void authenticate(
       const process::UPID& from,
@@ -686,9 +684,15 @@ protected:
       const std::vector<Archive::Framework>& completedFrameworks =
         std::vector<Archive::Framework>());
 
-  // Remove the slave from the registrar. Called when the slave
+  void _markUnreachable(
+      const SlaveInfo& slaveInfo,
+      const std::vector<StatusUpdate>& updates,
+      const process::Future<bool>& registrarResult,
+      const std::string& unreachableCause);
+
+  // Mark a slave as unreachable in the registry. Called when the slave
   // does not re-register in time after a master failover.
-  Nothing removeSlave(const Registry::Slave& slave);
+  Nothing markUnreachableAfterFailover(const Registry::Slave& slave);
 
   // Remove the slave from the registrar and from the master's state.
   //
@@ -701,8 +705,8 @@ protected:
   void _removeSlave(
       const SlaveInfo& slaveInfo,
       const std::vector<StatusUpdate>& updates,
-      const process::Future<bool>& removed,
-      const std::string& message,
+      const process::Future<bool>& registrarResult,
+      const std::string& removalCause,
       Option<process::metrics::Counter> reason = None());
 
   // Validates that the framework is authenticated, if required.
@@ -1684,12 +1688,12 @@ private:
     // from the registry.
     hashset<SlaveID> removing;
 
-    // We track removed slaves to preserve the consistency
-    // semantics of the pre-registrar code when a non-strict registrar
-    // is being used. That is, if we remove a slave, we must make
-    // an effort to prevent it from (re-)registering, sending updates,
-    // etc. We keep a cache here to prevent this from growing in an
-    // unbounded manner.
+    // Slaves that are in the process of being marked unreachable.
+    hashset<SlaveID> markingUnreachable;
+
+    // This collection includes agents that have gracefully shutdown,
+    // as well as those that have been marked unreachable. We keep a
+    // cache here to prevent this from growing in an unbounded manner.
     // TODO(bmahler): Ideally we could use a cache with set semantics.
     Cache<SlaveID, Nothing> removed;
 
@@ -2044,40 +2048,6 @@ private:
 };
 
 
-// Implementation of slave readmission Registrar operation.
-class ReadmitSlave : public Operation
-{
-public:
-  explicit ReadmitSlave(const SlaveInfo& _info) : info(_info)
-  {
-    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
-  }
-
-protected:
-  virtual Try<bool> perform(
-      Registry* registry,
-      hashset<SlaveID>* slaveIDs,
-      bool strict)
-  {
-    if (slaveIDs->contains(info.id())) {
-      return false; // No mutation.
-    }
-
-    if (strict) {
-      return Error("Agent not yet admitted");
-    } else {
-      Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
-      slave->mutable_info()->CopyFrom(info);
-      slaveIDs->insert(info.id());
-      return true; // Mutation.
-    }
-  }
-
-private:
-  const SlaveInfo info;
-};
-
-
 // Implementation of slave removal Registrar operation.
 class RemoveSlave : public Operation
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/937c85f2/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 6cde15f..514c1d9 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1761,10 +1761,10 @@ TEST_F(MasterTest, SlavesEndpointTwoSlaves)
 
 
 // This test ensures that when a slave is recovered from the registry
-// but does not re-register with the master, it is removed from the
-// registry and the framework is informed that the slave is lost, and
-// the slave is refused re-registration.
-TEST_F(MasterTest, RecoveredSlaveDoesNotReregister)
+// but does not re-register with the master, it is marked unreachable
+// in the registry, the framework is informed that the slave is lost,
+// and the slave is allowed to re-register.
+TEST_F(MasterTest, RecoveredSlaveCanReregister)
 {
   // Step 1: Start a master.
   master::Flags masterFlags = CreateMasterFlags();
@@ -1826,15 +1826,15 @@ TEST_F(MasterTest, RecoveredSlaveDoesNotReregister)
 
   Clock::resume();
 
-  // Step 7: Ensure the slave cannot re-register!
-  Future<ShutdownMessage> shutdownMessage =
-    FUTURE_PROTOBUF(ShutdownMessage(), master.get()->pid, _);
+  // Step 7: Ensure the slave can re-register.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get()->pid, _);
 
   detector = master.get()->createDetector();
   slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
-  AWAIT_READY(shutdownMessage);
+  AWAIT_READY(slaveReregisteredMessage);
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/937c85f2/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 8e3827a..34622b3 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -21,6 +21,7 @@
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/http.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
@@ -58,6 +59,9 @@ using process::Message;
 using process::Owned;
 using process::PID;
 
+using process::http::OK;
+using process::http::Response;
+
 using std::vector;
 
 using testing::_;
@@ -160,17 +164,13 @@ TEST_P(PartitionTest, PartitionedSlave)
 }
 
 
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to re-register, we deny the
-// re-registration by sending a ShutdownMessage to the slave.
-// Why? Because during a network partition, the master will remove a
-// partitioned slave, thus sending its tasks to LOST. At this point,
-// when the partition is removed, the slave will attempt to
-// re-register with its running tasks. We've already notified
-// frameworks that these tasks were LOST, so we have to have the
-// slave shut down.
-TEST_P(PartitionTest, PartitionedSlaveReregistration)
+// This test checks that a slave can reregister with the master after
+// a partition, and that PARTITION_AWARE tasks running on the slave
+// continue to run.
+TEST_P(PartitionTest, ReregisterSlavePartitionAware)
 {
+  Clock::pause();
+
   master::Flags masterFlags = CreateMasterFlags();
   masterFlags.registry_strict = GetParam();
 
@@ -186,47 +186,37 @@ TEST_P(PartitionTest, PartitionedSlaveReregistration)
 
   DROP_PROTOBUFS(PongSlaveMessage(), _, _);
 
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
   StandaloneMasterDetector detector(master.get()->pid);
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
   ASSERT_SOME(slave);
 
+  // Start a scheduler. The scheduler has the PARTITION_AWARE
+  // capability, so we expect its tasks to continue running when the
+  // partitioned agent reregisters.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
   Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
   AWAIT_READY(offers);
-  ASSERT_NE(0u, offers.get().size());
+  ASSERT_FALSE(offers.get().empty());
 
-  // Launch a task. This is to ensure the task is killed by the slave,
-  // during shutdown.
-  TaskID taskId;
-  taskId.set_value("1");
+  Offer offer = offers.get()[0];
 
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->MergeFrom(taskId);
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-  task.mutable_executor()->mutable_command()->set_value("sleep 60");
-
-  // Set up the expectations for launching the task.
-  EXPECT_CALL(exec, registered(_, _, _, _));
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+  TaskInfo task = createTask(offer, "sleep 60");
 
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
@@ -235,33 +225,171 @@ TEST_P(PartitionTest, PartitionedSlaveReregistration)
   Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
-  driver.launchTasks(offers.get()[0].id(), {task});
+  driver.launchTasks(offer.id(), {task});
 
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+  EXPECT_EQ(task.task_id(), runningStatus.get().task_id());
 
-  // Wait for the slave to have handled the acknowledgment prior
-  // to pausing the clock.
-  AWAIT_READY(statusUpdateAck);
+  const SlaveID slaveId = runningStatus.get().slave_id();
 
-  // Drop the first shutdown message from the master (simulated
-  // partition), allow the second shutdown message to pass when
-  // the slave re-registers.
-  Future<ShutdownMessage> shutdownMessage =
-    DROP_PROTOBUF(ShutdownMessage(), _, slave.get()->pid);
+  AWAIT_READY(statusUpdateAck);
 
-  Future<TaskStatus> lostStatus;
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  Future<TaskStatus> unreachableStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&lostStatus));
+    .WillOnce(FutureArg<1>(&unreachableStatus));
 
   Future<Nothing> slaveLost;
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .WillOnce(FutureSatisfy(&slaveLost));
 
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  // TODO(neilc): Update this when TASK_UNREACHABLE is introduced.
+  AWAIT_READY(unreachableStatus);
+  EXPECT_EQ(TASK_LOST, unreachableStatus.get().state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, 
unreachableStatus.get().reason());
+  EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id());
+  EXPECT_EQ(slaveId, unreachableStatus.get().slave_id());
+
+  AWAIT_READY(slaveLost);
+
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(1, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(0, stats.values["master/tasks_unreachable"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
+  EXPECT_EQ(1, stats.values["master/slave_removals"]);
+  EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
+
+  // We now complete the partition on the slave side as well. We
+  // simulate a master loss event, which would normally happen during
+  // a network partition. The slave should then reregister with the
+  // master.
+  detector.appoint(None());
+
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
+
+  detector.appoint(master.get()->pid);
+
+  AWAIT_READY(slaveReregistered);
+
+  // Perform explicit reconciliation; the task should still be running.
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(task.task_id());
+  status.mutable_slave_id()->CopyFrom(slaveId);
+  status.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate));
+
+  driver.reconcileTasks({status});
+
+  AWAIT_READY(reconcileUpdate);
+  EXPECT_EQ(TASK_RUNNING, reconcileUpdate.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate.get().reason());
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test checks that a slave can reregister with the master after
+// a partition, and that non-PARTITION_AWARE tasks running on the
+// slave are shutdown.
+TEST_P(PartitionTest, ReregisterSlaveNotPartitionAware)
+{
   Clock::pause();
 
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry_strict = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the SlaveObserver Process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  // Start a scheduler. The scheduler is not PARTITION_AWARE, so we
+  // expect its tasks to be shutdown when the partitioned agent
+  // reregisters.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  TaskInfo task = createTask(offer, "sleep 60");
+
+  Future<TaskStatus> runningStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&runningStatus));
+
+  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(runningStatus);
+  EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+  EXPECT_EQ(task.task_id(), runningStatus.get().task_id());
+
+  const SlaveID slaveId = runningStatus.get().slave_id();
+
+  AWAIT_READY(statusUpdateAck);
+
   // Now, induce a partition of the slave by having the master
   // timeout the slave.
+  Future<TaskStatus> lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  // Note that we expect to get `slaveLost` callbacks in both
+  // schedulers, regardless of PARTITION_AWARE.
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
   size_t pings = 0;
   while (true) {
     AWAIT_READY(ping);
@@ -275,71 +403,82 @@ TEST_P(PartitionTest, PartitionedSlaveReregistration)
 
   Clock::advance(masterFlags.agent_ping_timeout);
 
-  // The master will have notified the framework of the lost task.
+  // The scheduler should see TASK_LOST because it is not
+  // PARTITION_AWARE.
   AWAIT_READY(lostStatus);
   EXPECT_EQ(TASK_LOST, lostStatus.get().state());
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason());
+  EXPECT_EQ(task.task_id(), lostStatus.get().task_id());
+  EXPECT_EQ(slaveId, lostStatus.get().slave_id());
 
-  // Wait for the master to attempt to shut down the slave.
-  AWAIT_READY(shutdownMessage);
-
-  // The master will notify the framework that the slave was lost.
   AWAIT_READY(slaveLost);
 
   JSON::Object stats = Metrics();
   EXPECT_EQ(1, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(0, stats.values["master/tasks_unreachable"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
   EXPECT_EQ(1, stats.values["master/slave_removals"]);
   EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
 
-  Clock::resume();
-
-  // We now complete the partition on the slave side as well. This
-  // is done by simulating a master loss event which would normally
-  // occur during a network partition.
+  // We now complete the partition on the slave side as well. We
+  // simulate a master loss event, which would normally happen during
+  // a network partition. The slave should then reregister with the
+  // master.
   detector.appoint(None());
 
-  Future<Nothing> shutdown;
-  EXPECT_CALL(exec, shutdown(_))
-    .WillOnce(FutureSatisfy(&shutdown));
-
-  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()->pid);
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
 
-  // Have the slave re-register with the master.
   detector.appoint(master.get()->pid);
 
-  // Upon re-registration, the master will shutdown the slave.
-  // The slave will then shut down the executor.
-  AWAIT_READY(shutdownMessage);
-  AWAIT_READY(shutdown);
+  AWAIT_READY(slaveReregistered);
+
+  // Perform explicit reconciliation. The task should not be running
+  // (TASK_LOST) because the framework is not PARTITION_AWARE.
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(task.task_id());
+  status.mutable_slave_id()->CopyFrom(slaveId);
+  status.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate));
+
+  driver.reconcileTasks({status});
+
+  AWAIT_READY(reconcileUpdate);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate.get().reason());
+
+  Clock::resume();
 
   driver.stop();
   driver.join();
 }
 
 
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to send status updates, we send
-// a ShutdownMessage to the slave. Why? Because during a network
-// partition, the master will remove a partitioned slave, thus sending
-// its tasks to LOST. At this point, when the partition is removed,
-// the slave may attempt to send updates if it was unaware that the
-// master removed it. We've already notified frameworks that these
-// tasks were LOST, so we have to have the slave shut down.
+// This test checks how Mesos behaves when a slave is removed because
+// it fails health checks, and then the slave sends a status update
+// (because it does not realize that it is partitioned from the
+// master's POV). In prior Mesos versions, the master would shutdown
+// the slave in this situation. In Mesos >= 1.1, the master will drop
+// the status update; the slave will eventually try to reregister.
 TEST_P(PartitionTest, PartitionedSlaveStatusUpdates)
 {
+  Clock::pause();
+
   master::Flags masterFlags = CreateMasterFlags();
   masterFlags.registry_strict = GetParam();
 
   Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
-  // Allow the master to PING the slave, but drop all PONG messages
-  // from the slave. Note that we don't match on the master / slave
-  // PIDs because it's actually the SlaveObserver Process that sends
-  // the pings.
-  Future<Message> ping = FUTURE_MESSAGE(
-      Eq(PingSlaveMessage().GetTypeName()), _, _);
-
+  // Drop both PINGs from master to slave and PONGs from slave to
+  // master. Note that we don't match on the master / slave PIDs
+  // because it's actually the SlaveObserver Process that sends pings
+  // and receives pongs.
+  DROP_PROTOBUFS(PingSlaveMessage(), _, _);
   DROP_PROTOBUFS(PongSlaveMessage(), _, _);
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
@@ -370,12 +509,6 @@ TEST_P(PartitionTest, PartitionedSlaveStatusUpdates)
 
   AWAIT_READY(frameworkId);
 
-  // Drop the first shutdown message from the master (simulated
-  // partition), allow the second shutdown message to pass when
-  // the slave sends an update.
-  Future<ShutdownMessage> shutdownMessage =
-    DROP_PROTOBUF(ShutdownMessage(), _, slave.get()->pid);
-
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .WillRepeatedly(Return());
 
@@ -383,72 +516,107 @@ TEST_P(PartitionTest, PartitionedSlaveStatusUpdates)
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .WillOnce(FutureSatisfy(&slaveLost));
 
-  Clock::pause();
+  // Now, induce a partition of the slave by having the master timeout
+  // the slave. The master will remove the slave; the slave will also
+  // realize that it hasn't seen any pings from the master and try to
+  // reregister. We don't want to let the slave reregister yet, so we
+  // drop the first message in the reregistration protocol, which is
+  // AuthenticateMessage since agent auth is enabled.
+  Future<AuthenticateMessage> authenticateMessage =
+    DROP_PROTOBUF(AuthenticateMessage(), _, _);
 
-  // Now, induce a partition of the slave by having the master
-  // timeout the slave.
-  size_t pings = 0;
-  while (true) {
-    AWAIT_READY(ping);
-    pings++;
-    if (pings == masterFlags.max_agent_ping_timeouts) {
-      break;
-    }
-    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+  for (size_t i = 0; i < masterFlags.max_agent_ping_timeouts; i++) {
     Clock::advance(masterFlags.agent_ping_timeout);
+    Clock::settle();
   }
 
-  Clock::advance(masterFlags.agent_ping_timeout);
-
-  // Wait for the master to attempt to shut down the slave.
-  AWAIT_READY(shutdownMessage);
-
   // The master will notify the framework that the slave was lost.
   AWAIT_READY(slaveLost);
 
+  // Slave will try to authenticate for reregistration; message dropped.
+  AWAIT_READY(authenticateMessage);
+
   JSON::Object stats = Metrics();
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
   EXPECT_EQ(1, stats.values["master/slave_removals"]);
   EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
 
-  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()->pid);
-
   // At this point, the slave still thinks it's registered, so we
   // simulate a status update coming from the slave.
-  TaskID taskId;
-  taskId.set_value("task_id");
-  const StatusUpdate& update = protobuf::createStatusUpdate(
+  TaskID taskId1;
+  taskId1.set_value("task_id1");
+
+  const StatusUpdate& update1 = protobuf::createStatusUpdate(
       frameworkId.get(),
       slaveId,
-      taskId,
+      taskId1,
       TASK_RUNNING,
       TaskStatus::SOURCE_SLAVE,
       UUID::random());
 
-  StatusUpdateMessage message;
-  message.mutable_update()->CopyFrom(update);
-  message.set_pid(stringify(slave.get()->pid));
+  StatusUpdateMessage message1;
+  message1.mutable_update()->CopyFrom(update1);
+  message1.set_pid(stringify(slave.get()->pid));
+
+  // The scheduler should not receive the status update.
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(0);
 
-  process::post(master.get()->pid, message);
+  process::post(master.get()->pid, message1);
+  Clock::settle();
 
-  // The master should shutdown the slave upon receiving the update.
-  AWAIT_READY(shutdownMessage);
+  // Advance the clock so that the slaves notices that it still hasn't
+  // seen any pings from the master, which will cause it to try to
+  // reregister again. This time reregistration should succeed.
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
 
-  Clock::resume();
+  for (size_t i = 0; i < masterFlags.max_agent_ping_timeouts; i++) {
+    Clock::advance(masterFlags.agent_ping_timeout);
+    Clock::settle();
+  }
+
+  AWAIT_READY(slaveReregistered);
+
+  // Since the slave has reregistered, a status update from the slave
+  // should now be forwarded to the scheduler.
+  Future<StatusUpdateMessage> statusUpdate =
+    DROP_PROTOBUF(StatusUpdateMessage(), master.get()->pid, _);
+
+  TaskID taskId2;
+  taskId2.set_value("task_id2");
+
+  const StatusUpdate& update2 = protobuf::createStatusUpdate(
+      frameworkId.get(),
+      slaveId,
+      taskId2,
+      TASK_RUNNING,
+      TaskStatus::SOURCE_SLAVE,
+      UUID::random());
+
+  StatusUpdateMessage message2;
+  message2.mutable_update()->CopyFrom(update2);
+  message2.set_pid(stringify(slave.get()->pid));
+
+  process::post(master.get()->pid, message2);
+
+  AWAIT_READY(statusUpdate);
+  EXPECT_EQ(taskId2, statusUpdate->update().status().task_id());
 
   driver.stop();
   driver.join();
+
+  Clock::resume();
 }
 
 
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to send exited executor messages,
-// we send a ShutdownMessage to the slave. Why? Because during a
-// network partition, the master will remove a partitioned slave, thus
-// sending its tasks to LOST. At this point, when the partition is
-// removed, the slave may attempt to send exited executor messages if
-// it was unaware that the master removed it. We've already
-// notified frameworks that the tasks under the executors were LOST,
-// so we have to have the slave shut down.
+// This test checks how Mesos behaves when a slave is removed, and
+// then the slave sends an ExitedExecutorMessage (because it does not
+// realize it is partitioned from the master's POV). In prior Mesos
+// versions, the master would shutdown the slave in this situation. In
+// Mesos >= 1.1, the master will drop the message; the slave will
+// eventually try to reregister.
 TEST_P(PartitionTest, PartitionedSlaveExitedExecutor)
 {
   master::Flags masterFlags = CreateMasterFlags();
@@ -494,16 +662,7 @@ TEST_P(PartitionTest, PartitionedSlaveExitedExecutor)
 
   // Launch a task. This allows us to have the slave send an
   // ExitedExecutorMessage.
-  TaskID taskId;
-  taskId.set_value("1");
-
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->MergeFrom(taskId);
-  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-  task.mutable_executor()->mutable_command()->set_value("sleep 60");
+  TaskInfo task = createTask(offers.get()[0], "sleep 60", DEFAULT_EXECUTOR_ID);
 
   // Set up the expectations for launching the task.
   EXPECT_CALL(exec, registered(_, _, _, _));
@@ -511,19 +670,11 @@ TEST_P(PartitionTest, PartitionedSlaveExitedExecutor)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  // Drop all the status updates from the slave, so that we can
-  // ensure the ExitedExecutorMessage is what triggers the slave
-  // shutdown.
+  // Drop all the status updates from the slave.
   DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()->pid);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Drop the first shutdown message from the master (simulated
-  // partition) and allow the second shutdown message to pass when
-  // triggered by the ExitedExecutorMessage.
-  Future<ShutdownMessage> shutdownMessage =
-    DROP_PROTOBUF(ShutdownMessage(), _, slave.get()->pid);
-
   Future<TaskStatus> lostStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&lostStatus));
@@ -549,31 +700,34 @@ TEST_P(PartitionTest, PartitionedSlaveExitedExecutor)
 
   Clock::advance(masterFlags.agent_ping_timeout);
 
-  // The master will have notified the framework of the lost task.
+  // The master will notify the framework of the lost task.
   AWAIT_READY(lostStatus);
   EXPECT_EQ(TASK_LOST, lostStatus.get().state());
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason());
 
-  // Wait for the master to attempt to shut down the slave.
-  AWAIT_READY(shutdownMessage);
-
   // The master will notify the framework that the slave was lost.
   AWAIT_READY(slaveLost);
 
   JSON::Object stats = Metrics();
   EXPECT_EQ(1, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
   EXPECT_EQ(1, stats.values["master/slave_removals"]);
   EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
 
-  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()->pid);
+  EXPECT_CALL(sched, executorLost(&driver, _, _, _))
+    .Times(0);
 
   // Induce an ExitedExecutorMessage from the slave.
-  containerizer.destroy(
-      frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());
-
-  // Upon receiving the message, the master will shutdown the slave.
-  AWAIT_READY(shutdownMessage);
-
+  containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
+
+  // The master will drop the ExitedExecutorMessage. We do not
+  // currently support reliable delivery of ExitedExecutorMessages, so
+  // the message will not be delivered if/when the slave reregisters.
+  //
+  // TODO(neilc): Update this test to check for reliable delivery once
+  // MESOS-4308 is fixed.
+  Clock::settle();
   Clock::resume();
 
   driver.stop();

Reply via email to