Added a task reconciliation test. Review: https://reviews.apache.org/r/14436
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d303397 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d303397 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d303397 Branch: refs/heads/master Commit: 6d3033977fd1be9cbab04b2eacfd884710fe00df Parents: 7a38b74 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Tue Oct 1 15:40:28 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Wed Oct 2 12:20:32 2013 -0700 ---------------------------------------------------------------------- src/tests/slave_recovery_tests.cpp | 140 ++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/6d303397/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 416b9c6..097d43c 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -1848,6 +1848,146 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework) this->Shutdown(); // Shutdown before isolator(s) get deallocated. } + +// This ensures that reconciliation properly deals with tasks +// present in the master and missing from the slave. Notably: +// 1. The tasks are sent to LOST. +// 2. The task resources are recovered. +// TODO(bmahler): Ensure the executor resources are recovered by +// using an explicit executor. +TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) +{ + MockAllocatorProcess<master::allocator::HierarchicalDRFAllocatorProcess> + allocator; + + EXPECT_CALL(allocator, initialize(_, _, _)); + + Try<PID<Master> > master = this->StartMaster(&allocator); + ASSERT_SOME(master); + + TypeParam isolator1; + + slave::Flags flags = this->CreateSlaveFlags(); + + EXPECT_CALL(allocator, slaveAdded(_, _, _)); + + Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags); + ASSERT_SOME(slave); + + MockScheduler sched; + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo; + frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo.set_checkpoint(true); + + MesosSchedulerDriver driver(&sched, frameworkInfo, master.get()); + + EXPECT_CALL(allocator, frameworkAdded(_, _, _)); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer> > offers1; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers1)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers1); + EXPECT_NE(0u, offers1.get().size()); + + // Start a task on the slave so that the master has knowledge of it. + // We'll ensure the slave does not have this task when it + // re-registers by wiping the relevant meta directory. + TaskInfo task = createTask(offers1.get()[0], "sleep 10"); + vector<TaskInfo> tasks; + tasks.push_back(task); // Long-running task + + EXPECT_CALL(sched, statusUpdate(_, _)); + + Future<Nothing> _statusUpdateAcknowledgement = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + driver.launchTasks(offers1.get()[0].id(), tasks); + + // Wait for the ACK to be checkpointed. + AWAIT_READY(_statusUpdateAcknowledgement); + + EXPECT_CALL(allocator, slaveDisconnected(_)); + + this->Stop(slave.get()); + + // Construct the framework meta directory that needs wiping. + string frameworkPath = paths::getFrameworkPath( + paths::getMetaRootDir(flags.work_dir), + offers1.get()[0].slave_id(), + frameworkInfo.id()); + + // Remove the framework meta directory, so that the slave will not + // recover the task. + ASSERT_SOME(os::rmdir(frameworkPath, true)); + + Future<Nothing> recover = FUTURE_DISPATCH(_, &Slave::_recover); + + Future<ReregisterSlaveMessage> reregisterSlave = + FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _); + + EXPECT_CALL(allocator, slaveReconnected(_)); + EXPECT_CALL(allocator, resourcesRecovered(_, _, _)); + + // Restart the slave (use same flags) with a new isolator. + TypeParam isolator2; + + slave = this->StartSlave(&isolator2, flags); + ASSERT_SOME(slave); + + Clock::pause(); + + AWAIT_READY(recover); + + Clock::settle(); // Wait for slave to schedule reregister timeout. + + Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); + + // Wait for the slave to re-register. + AWAIT_READY(reregisterSlave); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + Future<vector<Offer> > offers2; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Wait for TASK_LOST update. + AWAIT_READY(status); + ASSERT_EQ(TASK_LOST, status.get().state()); + + // Advance the clock until the allocator allocates + // the recovered resources. + while (offers2.isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + // Make sure all slave resources are reoffered. + AWAIT_READY(offers2); + ASSERT_EQ(Resources(offers1.get()[0].resources()), + Resources(offers2.get()[0].resources())); + + Clock::resume(); + + driver.stop(); + driver.join(); + + this->Shutdown(); // Shutdown before isolator(s) get deallocated. +} + + // Scheduler asks a restarted slave to kill a task that has been // running before the slave restarted. A scheduler failover happens // when the slave is down. This test verifies that a scheduler