Updated master to send TASK_UNREACHABLE task state.

When a task launched by a partition-aware framework is running on an
agent that becomes partitioned from the master, the framework will now
receive TASK_UNREACHABLE, not TASK_LOST.

Similarly, when a partition-aware framework does explicit reconciliation
for an agent ID that appears in the "unreachable" list in the registry,
the master will now return TASK_UNREACHABLE rather than TASK_LOST.

Review: https://reviews.apache.org/r/51805/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fac32e4d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fac32e4d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fac32e4d

Branch: refs/heads/master
Commit: fac32e4d7f430d612b9ffc985e4c55a0ae3312fc
Parents: 81fe779
Author: Neil Conway <neil.con...@gmail.com>
Authored: Mon Sep 19 15:48:42 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 15:48:42 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp              | 37 +++++++++++++-------
 src/tests/partition_tests.cpp      | 62 +++++++++++++++++++--------------
 src/tests/reconciliation_tests.cpp | 11 ++++--
 3 files changed, 69 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fac32e4d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 38ca425..beea5ff 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5858,9 +5858,9 @@ void Master::markUnreachable(const SlaveID& slaveId)
   TimeInfo unreachableTime = protobuf::getCurrentTime();
 
   // Update the registry to move this slave from the list of admitted
-  // slaves to the list of unreachable slaves. After this is
-  // completed, we can update the master's in-memory state to remove
-  // the slave and send TASK_LOST status updates to the frameworks.
+  // slaves to the list of unreachable slaves. After this is complete,
+  // we can remove the slave from the master's in-memory state and
+  // send TASK_UNREACHABLE / TASK_LOST updates to the frameworks.
   registrar->apply(Owned<Operation>(
           new MarkSlaveUnreachable(slave->info, unreachableTime)))
     .onAny(defer(self(),
@@ -5909,19 +5909,25 @@ void Master::_markUnreachable(
   // the slave is already removed.
   allocator->removeSlave(slave->id);
 
-  // Transition the tasks to TASK_LOST and remove them.
-  // TODO(neilc): Update this to send TASK_UNREACHABLE for
-  // PARTITION_AWARE frameworks.
+  // Transition tasks to TASK_UNREACHABLE / TASK_LOST and remove them.
+  // We only use TASK_UNREACHABLE if the framework has opted in to the
+  // PARTITION_AWARE capability.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
     Framework* framework = getFramework(frameworkId);
     CHECK_NOTNULL(framework);
 
+    TaskState newTaskState = TASK_UNREACHABLE;
+    if (!protobuf::frameworkHasCapability(
+            framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      newTaskState = TASK_LOST;
+    }
+
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           task->framework_id(),
           task->slave_id(),
           task->task_id(),
-          TASK_LOST,
+          newTaskState,
           TaskStatus::SOURCE_MASTER,
           None(),
           "Slave " + slave->info.hostname() + " is unreachable",
@@ -6120,7 +6126,7 @@ void Master::_reconcileTasks(
   //   (2) Task is known: send the latest state.
   //   (3) Task is unknown, slave is registered: TASK_LOST.
   //   (4) Task is unknown, slave is transitioning: no-op.
-  //   (5) Task is unknown, slave is unreachable: TASK_LOST.
+  //   (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE.
   //   (6) Task is unknown, slave is unknown: TASK_LOST.
   //
   // When using a non-strict registry, case (6) may result in
@@ -6190,16 +6196,23 @@ void Master::_reconcileTasks(
                 << " for framework " << *framework
                 << " because there are transitional agents";
     } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) 
{
-      // (5) Slave is unreachable: TASK_LOST. The status update
-      // includes the time at which the slave was marked as
-      // unreachable.
+      // (5) Slave is unreachable: TASK_UNREACHABLE. If the framework
+      // does not have the PARTITION_AWARE capability, send TASK_LOST
+      // for backward compatibility. In either case, the status update
+      // also includes the time when the slave was marked unreachable.
       TimeInfo unreachableTime = slaves.unreachable[slaveId.get()];
 
+      TaskState taskState = TASK_UNREACHABLE;
+      if (!protobuf::frameworkHasCapability(
+              framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
       update = protobuf::createStatusUpdate(
           framework->id(),
           slaveId.get(),
           status.task_id(),
-          TASK_LOST,
+          taskState,
           TaskStatus::SOURCE_MASTER,
           None(),
           "Reconciliation: Task is unreachable",

http://git-wip-us.apache.org/repos/asf/mesos/blob/fac32e4d/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index d2fce57..c860d2a 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -262,9 +262,8 @@ TEST_P(PartitionTest, ReregisterSlavePartitionAware)
 
   Clock::advance(masterFlags.agent_ping_timeout);
 
-  // TODO(neilc): Update this when TASK_UNREACHABLE is introduced.
   AWAIT_READY(unreachableStatus);
-  EXPECT_EQ(TASK_LOST, unreachableStatus.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state());
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, 
unreachableStatus.get().reason());
   EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id());
   EXPECT_EQ(slaveId, unreachableStatus.get().slave_id());
@@ -272,8 +271,8 @@ TEST_P(PartitionTest, ReregisterSlavePartitionAware)
   AWAIT_READY(slaveLost);
 
   JSON::Object stats = Metrics();
-  EXPECT_EQ(1, stats.values["master/tasks_lost"]);
-  EXPECT_EQ(0, stats.values["master/tasks_unreachable"]);
+  EXPECT_EQ(0, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(1, stats.values["master/tasks_unreachable"]);
   EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
   EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
   EXPECT_EQ(1, stats.values["master/slave_removals"]);
@@ -642,10 +641,9 @@ TEST_P(PartitionTest, 
PartitionedSlaveReregistrationMasterFailover)
   EXPECT_EQ(slaveId, lostStatus.get().slave_id());
   EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time());
 
-  // `sched2` should see TASK_LOST.
-  // TODO(neilc): Update this to expect TASK_UNREACHABLE.
+  // `sched2` should see TASK_UNREACHABLE.
   AWAIT_READY(unreachableStatus);
-  EXPECT_EQ(TASK_LOST, unreachableStatus.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state());
   EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, 
unreachableStatus.get().reason());
   EXPECT_EQ(task2.task_id(), unreachableStatus.get().task_id());
   EXPECT_EQ(slaveId, unreachableStatus.get().slave_id());
@@ -805,9 +803,9 @@ TEST_P(PartitionTest, PartitionedSlaveOrphanedTask)
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.
-  Future<TaskStatus> lostStatus;
+  Future<TaskStatus> unreachableStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&lostStatus));
+    .WillOnce(FutureArg<1>(&unreachableStatus));
 
   Future<Nothing> slaveLost;
   EXPECT_CALL(sched, slaveLost(&driver, _))
@@ -835,13 +833,12 @@ TEST_P(PartitionTest, PartitionedSlaveOrphanedTask)
 
   Clock::advance(Milliseconds(100));
 
-  // TODO(neilc): Update this to expect `TASK_UNREACHABLE`.
-  AWAIT_READY(lostStatus);
-  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
-  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason());
-  EXPECT_EQ(task.task_id(), lostStatus.get().task_id());
-  EXPECT_EQ(slaveId, lostStatus.get().slave_id());
-  EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time());
+  AWAIT_READY(unreachableStatus);
+  EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, 
unreachableStatus.get().reason());
+  EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id());
+  EXPECT_EQ(slaveId, unreachableStatus.get().slave_id());
+  EXPECT_EQ(partitionTime, unreachableStatus.get().unreachable_time());
 
   AWAIT_READY(slaveLost);
 
@@ -1325,9 +1322,13 @@ TEST_P(PartitionTest, RegistryGcByCount)
   AWAIT_READY(slaveRegisteredMessage1);
   const SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id();
 
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -1443,7 +1444,7 @@ TEST_P(PartitionTest, RegistryGcByCount)
   driver.reconcileTasks({status1});
 
   AWAIT_READY(reconcileUpdate1);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate1.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
   EXPECT_EQ(partitionTime1, reconcileUpdate1.get().unreachable_time());
 
@@ -1485,7 +1486,7 @@ TEST_P(PartitionTest, RegistryGcByCount)
   driver.reconcileTasks({status3});
 
   AWAIT_READY(reconcileUpdate3);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate3.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate3.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate3.get().reason());
   EXPECT_EQ(partitionTime2, reconcileUpdate3.get().unreachable_time());
 
@@ -1558,10 +1559,15 @@ TEST_P(PartitionTest, RegistryGcByCountManySlaves)
   Clock::advance(masterFlags.registry_gc_interval);
   Clock::settle();
 
-  // Start a scheduler: we verify GC behavior by doing reconciliation.
+  // Start a partition-aware scheduler. We verify GC behavior by doing
+  // reconciliation.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   Future<Nothing> registered;
   EXPECT_CALL(sched, registered(&driver, _, _))
@@ -1589,7 +1595,7 @@ TEST_P(PartitionTest, RegistryGcByCountManySlaves)
   driver.reconcileTasks({status1});
 
   AWAIT_READY(reconcileUpdate1);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate1.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
   EXPECT_EQ(unreachableTime, reconcileUpdate1.get().unreachable_time());
 
@@ -1660,9 +1666,13 @@ TEST_P(PartitionTest, RegistryGcByAge)
   AWAIT_READY(slaveRegisteredMessage1);
   const SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id();
 
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -1782,7 +1792,7 @@ TEST_P(PartitionTest, RegistryGcByAge)
   driver.reconcileTasks({status1});
 
   AWAIT_READY(reconcileUpdate1);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate1.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
   EXPECT_EQ(partitionTime1, reconcileUpdate1.get().unreachable_time());
 
@@ -1798,7 +1808,7 @@ TEST_P(PartitionTest, RegistryGcByAge)
   driver.reconcileTasks({status2});
 
   AWAIT_READY(reconcileUpdate2);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate2.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate2.get().reason());
   EXPECT_EQ(partitionTime2, reconcileUpdate2.get().unreachable_time());
 
@@ -1837,7 +1847,7 @@ TEST_P(PartitionTest, RegistryGcByAge)
   driver.reconcileTasks({status4});
 
   AWAIT_READY(reconcileUpdate4);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate4.get().state());
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate4.get().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate4.get().reason());
   EXPECT_EQ(partitionTime2, reconcileUpdate4.get().unreachable_time());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/fac32e4d/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp 
b/src/tests/reconciliation_tests.cpp
index 9828ab9..1412090 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -230,15 +230,20 @@ TEST_F(ReconciliationTest, TaskStateMatch)
 
 
 // This test verifies that reconciliation of a task that belongs to an
-// unknown slave results in TASK_LOST.
+// unknown slave results in TASK_LOST, even if the framework has
+// enabled the PARTITION_AWARE capability.
 TEST_F(ReconciliationTest, UnknownSlave)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-    &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+    &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   Future<FrameworkID> frameworkId;
   EXPECT_CALL(sched, registered(&driver, _, _))
@@ -253,7 +258,7 @@ TEST_F(ReconciliationTest, UnknownSlave)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&update));
 
-  // Create a task status with a random slave id (and task id).
+  // Create a task status with a random slave id and task id.
   TaskStatus status;
   status.mutable_task_id()->set_value(UUID::random().toString());
   status.mutable_slave_id()->set_value(UUID::random().toString());

Reply via email to