Updated master to send TASK_UNREACHABLE task state. When a task launched by a partition-aware framework is running on an agent that becomes partitioned from the master, the framework will now receive TASK_UNREACHABLE, not TASK_LOST.
Similarly, when a partition-aware framework does explicit reconciliation for an agent ID that appears in the "unreachable" list in the registry, the master will now return TASK_UNREACHABLE rather than TASK_LOST. Review: https://reviews.apache.org/r/51805/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fac32e4d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fac32e4d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fac32e4d Branch: refs/heads/master Commit: fac32e4d7f430d612b9ffc985e4c55a0ae3312fc Parents: 81fe779 Author: Neil Conway <neil.con...@gmail.com> Authored: Mon Sep 19 15:48:42 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Mon Sep 19 15:48:42 2016 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 37 +++++++++++++------- src/tests/partition_tests.cpp | 62 +++++++++++++++++++-------------- src/tests/reconciliation_tests.cpp | 11 ++++-- 3 files changed, 69 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fac32e4d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 38ca425..beea5ff 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -5858,9 +5858,9 @@ void Master::markUnreachable(const SlaveID& slaveId) TimeInfo unreachableTime = protobuf::getCurrentTime(); // Update the registry to move this slave from the list of admitted - // slaves to the list of unreachable slaves. After this is - // completed, we can update the master's in-memory state to remove - // the slave and send TASK_LOST status updates to the frameworks. + // slaves to the list of unreachable slaves. After this is complete, + // we can remove the slave from the master's in-memory state and + // send TASK_UNREACHABLE / TASK_LOST updates to the frameworks. registrar->apply(Owned<Operation>( new MarkSlaveUnreachable(slave->info, unreachableTime))) .onAny(defer(self(), @@ -5909,19 +5909,25 @@ void Master::_markUnreachable( // the slave is already removed. allocator->removeSlave(slave->id); - // Transition the tasks to TASK_LOST and remove them. - // TODO(neilc): Update this to send TASK_UNREACHABLE for - // PARTITION_AWARE frameworks. + // Transition tasks to TASK_UNREACHABLE / TASK_LOST and remove them. + // We only use TASK_UNREACHABLE if the framework has opted in to the + // PARTITION_AWARE capability. foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) { Framework* framework = getFramework(frameworkId); CHECK_NOTNULL(framework); + TaskState newTaskState = TASK_UNREACHABLE; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + newTaskState = TASK_LOST; + } + foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) { const StatusUpdate& update = protobuf::createStatusUpdate( task->framework_id(), task->slave_id(), task->task_id(), - TASK_LOST, + newTaskState, TaskStatus::SOURCE_MASTER, None(), "Slave " + slave->info.hostname() + " is unreachable", @@ -6120,7 +6126,7 @@ void Master::_reconcileTasks( // (2) Task is known: send the latest state. // (3) Task is unknown, slave is registered: TASK_LOST. // (4) Task is unknown, slave is transitioning: no-op. - // (5) Task is unknown, slave is unreachable: TASK_LOST. + // (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE. // (6) Task is unknown, slave is unknown: TASK_LOST. // // When using a non-strict registry, case (6) may result in @@ -6190,16 +6196,23 @@ void Master::_reconcileTasks( << " for framework " << *framework << " because there are transitional agents"; } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) { - // (5) Slave is unreachable: TASK_LOST. The status update - // includes the time at which the slave was marked as - // unreachable. + // (5) Slave is unreachable: TASK_UNREACHABLE. If the framework + // does not have the PARTITION_AWARE capability, send TASK_LOST + // for backward compatibility. In either case, the status update + // also includes the time when the slave was marked unreachable. TimeInfo unreachableTime = slaves.unreachable[slaveId.get()]; + TaskState taskState = TASK_UNREACHABLE; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + update = protobuf::createStatusUpdate( framework->id(), slaveId.get(), status.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_MASTER, None(), "Reconciliation: Task is unreachable", http://git-wip-us.apache.org/repos/asf/mesos/blob/fac32e4d/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp index d2fce57..c860d2a 100644 --- a/src/tests/partition_tests.cpp +++ b/src/tests/partition_tests.cpp @@ -262,9 +262,8 @@ TEST_P(PartitionTest, ReregisterSlavePartitionAware) Clock::advance(masterFlags.agent_ping_timeout); - // TODO(neilc): Update this when TASK_UNREACHABLE is introduced. AWAIT_READY(unreachableStatus); - EXPECT_EQ(TASK_LOST, unreachableStatus.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state()); EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus.get().reason()); EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id()); EXPECT_EQ(slaveId, unreachableStatus.get().slave_id()); @@ -272,8 +271,8 @@ TEST_P(PartitionTest, ReregisterSlavePartitionAware) AWAIT_READY(slaveLost); JSON::Object stats = Metrics(); - EXPECT_EQ(1, stats.values["master/tasks_lost"]); - EXPECT_EQ(0, stats.values["master/tasks_unreachable"]); + EXPECT_EQ(0, stats.values["master/tasks_lost"]); + EXPECT_EQ(1, stats.values["master/tasks_unreachable"]); EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]); EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]); EXPECT_EQ(1, stats.values["master/slave_removals"]); @@ -642,10 +641,9 @@ TEST_P(PartitionTest, PartitionedSlaveReregistrationMasterFailover) EXPECT_EQ(slaveId, lostStatus.get().slave_id()); EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time()); - // `sched2` should see TASK_LOST. - // TODO(neilc): Update this to expect TASK_UNREACHABLE. + // `sched2` should see TASK_UNREACHABLE. AWAIT_READY(unreachableStatus); - EXPECT_EQ(TASK_LOST, unreachableStatus.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state()); EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus.get().reason()); EXPECT_EQ(task2.task_id(), unreachableStatus.get().task_id()); EXPECT_EQ(slaveId, unreachableStatus.get().slave_id()); @@ -805,9 +803,9 @@ TEST_P(PartitionTest, PartitionedSlaveOrphanedTask) // Now, induce a partition of the slave by having the master // timeout the slave. - Future<TaskStatus> lostStatus; + Future<TaskStatus> unreachableStatus; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&lostStatus)); + .WillOnce(FutureArg<1>(&unreachableStatus)); Future<Nothing> slaveLost; EXPECT_CALL(sched, slaveLost(&driver, _)) @@ -835,13 +833,12 @@ TEST_P(PartitionTest, PartitionedSlaveOrphanedTask) Clock::advance(Milliseconds(100)); - // TODO(neilc): Update this to expect `TASK_UNREACHABLE`. - AWAIT_READY(lostStatus); - EXPECT_EQ(TASK_LOST, lostStatus.get().state()); - EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason()); - EXPECT_EQ(task.task_id(), lostStatus.get().task_id()); - EXPECT_EQ(slaveId, lostStatus.get().slave_id()); - EXPECT_EQ(partitionTime, lostStatus.get().unreachable_time()); + AWAIT_READY(unreachableStatus); + EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus.get().reason()); + EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id()); + EXPECT_EQ(slaveId, unreachableStatus.get().slave_id()); + EXPECT_EQ(partitionTime, unreachableStatus.get().unreachable_time()); AWAIT_READY(slaveLost); @@ -1325,9 +1322,13 @@ TEST_P(PartitionTest, RegistryGcByCount) AWAIT_READY(slaveRegisteredMessage1); const SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id(); + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); EXPECT_CALL(sched, registered(&driver, _, _)); @@ -1443,7 +1444,7 @@ TEST_P(PartitionTest, RegistryGcByCount) driver.reconcileTasks({status1}); AWAIT_READY(reconcileUpdate1); - EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate1.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1.get().reason()); EXPECT_EQ(partitionTime1, reconcileUpdate1.get().unreachable_time()); @@ -1485,7 +1486,7 @@ TEST_P(PartitionTest, RegistryGcByCount) driver.reconcileTasks({status3}); AWAIT_READY(reconcileUpdate3); - EXPECT_EQ(TASK_LOST, reconcileUpdate3.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate3.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate3.get().reason()); EXPECT_EQ(partitionTime2, reconcileUpdate3.get().unreachable_time()); @@ -1558,10 +1559,15 @@ TEST_P(PartitionTest, RegistryGcByCountManySlaves) Clock::advance(masterFlags.registry_gc_interval); Clock::settle(); - // Start a scheduler: we verify GC behavior by doing reconciliation. + // Start a partition-aware scheduler. We verify GC behavior by doing + // reconciliation. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); Future<Nothing> registered; EXPECT_CALL(sched, registered(&driver, _, _)) @@ -1589,7 +1595,7 @@ TEST_P(PartitionTest, RegistryGcByCountManySlaves) driver.reconcileTasks({status1}); AWAIT_READY(reconcileUpdate1); - EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate1.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1.get().reason()); EXPECT_EQ(unreachableTime, reconcileUpdate1.get().unreachable_time()); @@ -1660,9 +1666,13 @@ TEST_P(PartitionTest, RegistryGcByAge) AWAIT_READY(slaveRegisteredMessage1); const SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id(); + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); EXPECT_CALL(sched, registered(&driver, _, _)); @@ -1782,7 +1792,7 @@ TEST_P(PartitionTest, RegistryGcByAge) driver.reconcileTasks({status1}); AWAIT_READY(reconcileUpdate1); - EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate1.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1.get().reason()); EXPECT_EQ(partitionTime1, reconcileUpdate1.get().unreachable_time()); @@ -1798,7 +1808,7 @@ TEST_P(PartitionTest, RegistryGcByAge) driver.reconcileTasks({status2}); AWAIT_READY(reconcileUpdate2); - EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate2.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2.get().reason()); EXPECT_EQ(partitionTime2, reconcileUpdate2.get().unreachable_time()); @@ -1837,7 +1847,7 @@ TEST_P(PartitionTest, RegistryGcByAge) driver.reconcileTasks({status4}); AWAIT_READY(reconcileUpdate4); - EXPECT_EQ(TASK_LOST, reconcileUpdate4.get().state()); + EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate4.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate4.get().reason()); EXPECT_EQ(partitionTime2, reconcileUpdate4.get().unreachable_time()); http://git-wip-us.apache.org/repos/asf/mesos/blob/fac32e4d/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 9828ab9..1412090 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -230,15 +230,20 @@ TEST_F(ReconciliationTest, TaskStateMatch) // This test verifies that reconciliation of a task that belongs to an -// unknown slave results in TASK_LOST. +// unknown slave results in TASK_LOST, even if the framework has +// enabled the PARTITION_AWARE capability. TEST_F(ReconciliationTest, UnknownSlave) { Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); Future<FrameworkID> frameworkId; EXPECT_CALL(sched, registered(&driver, _, _)) @@ -253,7 +258,7 @@ TEST_F(ReconciliationTest, UnknownSlave) EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&update)); - // Create a task status with a random slave id (and task id). + // Create a task status with a random slave id and task id. TaskStatus status; status.mutable_task_id()->set_value(UUID::random().toString()); status.mutable_slave_id()->set_value(UUID::random().toString());