Added a task reconciliation test.

Review: https://reviews.apache.org/r/14436


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d303397
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d303397
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d303397

Branch: refs/heads/master
Commit: 6d3033977fd1be9cbab04b2eacfd884710fe00df
Parents: 7a38b74
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Tue Oct 1 15:40:28 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Wed Oct 2 12:20:32 2013 -0700

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 140 ++++++++++++++++++++++++++++++++
 1 file changed, 140 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6d303397/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index 416b9c6..097d43c 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1848,6 +1848,146 @@ TYPED_TEST(SlaveRecoveryTest, 
ReconcileShutdownFramework)
   this->Shutdown(); // Shutdown before isolator(s) get deallocated.
 }
 
+
+// This ensures that reconciliation properly deals with tasks
+// present in the master and missing from the slave. Notably:
+//   1. The tasks are sent to LOST.
+//   2. The task resources are recovered.
+// TODO(bmahler): Ensure the executor resources are recovered by
+// using an explicit executor.
+TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
+{
+  MockAllocatorProcess<master::allocator::HierarchicalDRFAllocatorProcess>
+    allocator;
+
+  EXPECT_CALL(allocator, initialize(_, _, _));
+
+  Try<PID<Master> > master = this->StartMaster(&allocator);
+  ASSERT_SOME(master);
+
+  TypeParam isolator1;
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  EXPECT_CALL(allocator, slaveAdded(_, _, _));
+
+  Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  EXPECT_CALL(allocator, frameworkAdded(_, _, _));
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+
+  // Start a task on the slave so that the master has knowledge of it.
+  // We'll ensure the slave does not have this task when it
+  // re-registers by wiping the relevant meta directory.
+  TaskInfo task = createTask(offers1.get()[0], "sleep 10");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task
+
+  EXPECT_CALL(sched, statusUpdate(_, _));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers1.get()[0].id(), tasks);
+
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  EXPECT_CALL(allocator, slaveDisconnected(_));
+
+  this->Stop(slave.get());
+
+  // Construct the framework meta directory that needs wiping.
+  string frameworkPath = paths::getFrameworkPath(
+      paths::getMetaRootDir(flags.work_dir),
+      offers1.get()[0].slave_id(),
+      frameworkInfo.id());
+
+  // Remove the framework meta directory, so that the slave will not
+  // recover the task.
+  ASSERT_SOME(os::rmdir(frameworkPath, true));
+
+  Future<Nothing> recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+  Future<ReregisterSlaveMessage> reregisterSlave =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  EXPECT_CALL(allocator, slaveReconnected(_));
+  EXPECT_CALL(allocator, resourcesRecovered(_, _, _));
+
+  // Restart the slave (use same flags) with a new isolator.
+  TypeParam isolator2;
+
+  slave = this->StartSlave(&isolator2, flags);
+  ASSERT_SOME(slave);
+
+  Clock::pause();
+
+  AWAIT_READY(recover);
+
+  Clock::settle(); // Wait for slave to schedule reregister timeout.
+
+  Clock::advance(EXECUTOR_REREGISTER_TIMEOUT);
+
+  // Wait for the slave to re-register.
+  AWAIT_READY(reregisterSlave);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());        // Ignore subsequent updates.
+
+  Future<vector<Offer> > offers2;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  // Wait for TASK_LOST update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_LOST, status.get().state());
+
+  // Advance the clock until the allocator allocates
+  // the recovered resources.
+  while (offers2.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  // Make sure all slave resources are reoffered.
+  AWAIT_READY(offers2);
+  ASSERT_EQ(Resources(offers1.get()[0].resources()),
+            Resources(offers2.get()[0].resources()));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}
+
+
 // Scheduler asks a restarted slave to kill a task that has been
 // running before the slave restarted. A scheduler failover happens
 // when the slave is down. This test verifies that a scheduler

Reply via email to