Added SlaveRecoveryTest.MultipleFrameworks test. From: Jiang Yan Xu <y...@jxu.me> Review: https://reviews.apache.org/r/13786
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a866c1a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a866c1a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a866c1a Branch: refs/heads/master Commit: 9a866c1a05be10d821df8b569f2af3722986745b Parents: 9a9daa4 Author: Vinod Kone <vi...@twitter.com> Authored: Mon Aug 26 12:18:20 2013 -0400 Committer: Vinod Kone <vi...@twitter.com> Committed: Mon Aug 26 12:18:20 2013 -0400 ---------------------------------------------------------------------- src/tests/slave_recovery_tests.cpp | 155 ++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9a866c1a/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index c2c3ce0..c0f130e 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -2105,3 +2105,158 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover) this->Shutdown(); // Shutdown before isolator(s) get deallocated. } + + +// In this test there are two frameworks and one slave. Each +// framework launches a task before the slave goes down. We verify +// that the two frameworks and their tasks are recovered after the +// slave restarts. +TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks) +{ + Try<PID<Master> > master = this->StartMaster(); + ASSERT_SOME(master); + + TypeParam isolator1; + + slave::Flags flags = this->CreateSlaveFlags(); + + Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags); + ASSERT_SOME(slave); + + // Framework 1. + MockScheduler sched1; + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo1; + frameworkInfo1.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo1.set_checkpoint(true); + + MesosSchedulerDriver driver1(&sched1, frameworkInfo1, master.get()); + + EXPECT_CALL(sched1, registered(_, _, _)); + + Future<vector<Offer> > offers1; + EXPECT_CALL(sched1, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers1)); + + driver1.start(); + + AWAIT_READY(offers1); + EXPECT_NE(0u, offers1.get().size()); + + // Use part of the resources in the offer so that the rest can be + // offered to framework 2. + Offer offer1 = offers1.get()[0]; + offer1.mutable_resources()->CopyFrom( + Resources::parse("cpus:1;mem:512").get()); + + // Framework 1 launches a task. + TaskInfo task1 = createTask(offer1, "sleep 1000"); + vector<TaskInfo> tasks1; + tasks1.push_back(task1); // Long-running task + + EXPECT_CALL(sched1, statusUpdate(_, _)); + + Future<Nothing> _statusUpdateAcknowledgement1 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + driver1.launchTasks(offer1.id(), tasks1); + + // Wait for the ACK to be checkpointed. + AWAIT_READY(_statusUpdateAcknowledgement1); + + // Framework 2. + MockScheduler sched2; + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo2; + frameworkInfo2.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo2.set_checkpoint(true); + + MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get()); + + EXPECT_CALL(sched2, registered(_, _, _)); + + Future<vector<Offer> > offers2; + EXPECT_CALL(sched2, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers2)); + + driver2.start(); + + AWAIT_READY(offers2); + EXPECT_NE(0u, offers2.get().size()); + + // Framework 2 launches a task. + TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); + + vector<TaskInfo> tasks2; + tasks2.push_back(task2); // Long-running task + + EXPECT_CALL(sched2, statusUpdate(_, _)); + + Future<Nothing> _statusUpdateAcknowledgement2 = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + driver2.launchTasks(offers2.get()[0].id(), tasks2); + + // Wait for the ACK to be checkpointed. + AWAIT_READY(_statusUpdateAcknowledgement2); + + this->Stop(slave.get()); + + Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); + + Future<ReregisterSlaveMessage> reregisterSlaveMessage = + FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _); + + // Restart the slave (use same flags) with a new isolator. + TypeParam isolator2; + + slave = this->StartSlave(&isolator2, flags); + ASSERT_SOME(slave); + + Clock::pause(); + + AWAIT_READY(_recover); + + Clock::settle(); // Wait for slave to schedule reregister timeout. + + Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); + + // Wait for the slave to re-register. + AWAIT_READY(reregisterSlaveMessage); + + Clock::resume(); + + // Expectations for the status changes as a result of killing the + // tasks. + Future<TaskStatus> status1; + EXPECT_CALL(sched1, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status1)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + Future<TaskStatus> status2; + EXPECT_CALL(sched2, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status2)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + // Kill task 1. + driver1.killTask(task1.task_id()); + + // Wait for TASK_KILLED update. + AWAIT_READY(status1); + ASSERT_EQ(TASK_KILLED, status1.get().state()); + + // Kill task 2. + driver2.killTask(task2.task_id()); + + // Wait for TASK_KILLED update. + AWAIT_READY(status2); + ASSERT_EQ(TASK_KILLED, status2.get().state()); + + driver1.stop(); + driver1.join(); + driver2.stop(); + driver2.join(); + + this->Shutdown(); // Shutdown before isolator(s) get deallocated. +}