Fixed a crash in the master during framework re-registration.

Review: https://reviews.apache.org/r/14548


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8077ad40
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8077ad40
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8077ad40

Branch: refs/heads/master
Commit: 8077ad40f2e2ec1912425c3cfc2a64e32b86881d
Parents: 0eb61a3
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Tue Oct 8 16:47:56 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Tue Oct 8 18:36:02 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp               |  22 +++----
 src/slave/slave.cpp                 |  45 ++++++-------
 src/tests/fault_tolerance_tests.cpp | 109 ++++++++++++++++++++++++++++++-
 3 files changed, 137 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8077ad40/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cdfae1d..2fd48a6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -794,18 +794,16 @@ void Master::reregisterFramework(const FrameworkInfo& 
frameworkInfo,
         foreachvalue (Task* task, slave->tasks[frameworkId]) {
           if (framework->id == task->framework_id()) {
             framework->addTask(task);
-            // Also add the task's executor for resource accounting.
-            if (task->has_executor_id()) {
-              if (!framework->hasExecutor(slave->id, task->executor_id())) {
-                CHECK(slave->hasExecutor(framework->id, task->executor_id()))
-                  << "Unknown executor " << task->executor_id()
-                  << " of framework " << framework->id
-                  << " for the task " << task->task_id();
-
-                const ExecutorInfo& executorInfo =
-                  slave->executors[framework->id][task->executor_id()];
-                framework->addExecutor(slave->id, executorInfo);
-              }
+
+            // Also add the task's executor for resource accounting
+            // if it's still alive on the slave and we've not yet
+            // added it to the framework.
+            if (task->has_executor_id() &&
+                slave->hasExecutor(framework->id, task->executor_id()) &&
+                !framework->hasExecutor(slave->id, task->executor_id())) {
+              const ExecutorInfo& executorInfo =
+                slave->executors[framework->id][task->executor_id()];
+              framework->addExecutor(slave->id, executorInfo);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8077ad40/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1dc2189..debb2f4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -723,9 +723,28 @@ void Slave::doReliableRegistration()
           continue;
         }
 
+        // Add launched, terminated, and queued tasks.
+        foreach (Task* task, executor->launchedTasks.values()) {
+          message.add_tasks()->CopyFrom(*task);
+        }
+        foreach (Task* task, executor->terminatedTasks.values()) {
+          message.add_tasks()->CopyFrom(*task);
+        }
+        foreach (const TaskInfo& task, executor->queuedTasks.values()) {
+          message.add_tasks()->CopyFrom(protobuf::createTask(
+              task, TASK_STAGING, executor->id, framework->id));
+        }
+
         // Do not re-register with Command Executors because the
         // master doesn't store them; they are generated by the slave.
-        if (!executor->commandExecutor) {
+        if (executor->commandExecutor) {
+          // NOTE: We have to unset the executor id here for the task
+          // because the master uses the absence of task.executor_id()
+          // to detect command executors.
+          for (int i = 0; i < message.tasks_size(); ++i) {
+            message.mutable_tasks(i)->clear_executor_id();
+          }
+        } else {
           ExecutorInfo* executorInfo = message.add_executor_infos();
           executorInfo->MergeFrom(executor->info);
 
@@ -735,30 +754,6 @@ void Slave::doReliableRegistration()
           // it a required field.
           executorInfo->mutable_framework_id()->MergeFrom(framework->id);
         }
-
-        // Add launched tasks.
-        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
-        // supports it.
-        foreach (Task* task, executor->launchedTasks.values()) {
-          message.add_tasks()->CopyFrom(*task);
-        }
-
-        // Add queued tasks.
-        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
-        // supports it.
-        foreach (const TaskInfo& task, executor->queuedTasks.values()) {
-          const Task& t = protobuf::createTask(
-              task, TASK_STAGING, executor->id, framework->id);
-
-          message.add_tasks()->CopyFrom(t);
-        }
-
-        // Add terminated tasks.
-        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
-        // supports it.
-        foreach (Task* task, executor->terminatedTasks.values()) {
-          message.add_tasks()->CopyFrom(*task);
-        }
       }
     }
     send(master, message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/8077ad40/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index 254eae4..35b743a 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -998,8 +998,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
   vector<TaskInfo> tasks;
   tasks.push_back(task);
 
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .Times(1);
+  EXPECT_CALL(exec, registered(_, _, _, _));
 
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
@@ -1061,6 +1060,112 @@ TEST_F(FaultToleranceTest, 
SchedulerFailoverStatusUpdate)
 }
 
 
+// This test was added to ensure MESOS-420 is fixed.
+// We need to make sure that the master correctly handles non-terminal
+// tasks with exited executors upon framework re-registration. This is
+// possible because the ExitedExecutor message can arrive before the
+// terminal status update(s) of its task(s).
+TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
+{
+  // First we'll start a master and slave, then register a framework
+  // so we can launch a task.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+  Try<PID<Slave> > slave = StartSlave(&isolator);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<process::Message> frameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(1, 1, 16, "*"));
+
+  Future<Nothing> statusUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureSatisfy(&statusUpdate));    // TASK_RUNNING.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  // Wait until TASK_RUNNING of the task is received.
+  AWAIT_READY(statusUpdate);
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  // Now that the task is launched, we need to induce the following:
+  //   1. ExitedExecutorMessage received by the master prior to a
+  //      terminal status update for the corresponding task. This
+  //      means we need to drop the status update coming from the
+  //      slave.
+  //   2. Framework re-registration.
+  //
+  // To achieve this, we need to:
+  //   1. Restart the master (the slave / framework will not detect
+  //      the new master automatically using the BasicMasterDetector).
+  //   2. Notify the slave of the new master.
+  //   3. Kill the executor.
+  //   4. Drop the status update, but allow the ExitedExecutorMessage.
+  //   5. Notify the framework of the new master.
+  Stop(master.get());
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Simulate a new master detected message to the slave.
+  NewMasterDetectedMessage newMasterDetectedMsg;
+  newMasterDetectedMsg.set_pid(master.get());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  process::post(slave.get(), newMasterDetectedMsg);
+
+  // Wait for the slave to re-register.
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Allow the executor exited message and drop the status update.
+  Future<ExitedExecutorMessage> executorExitedMessage =
+    FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  // Now kill the executor.
+  dispatch(isolator, &Isolator::killExecutor, frameworkId, 
DEFAULT_EXECUTOR_ID);
+
+  AWAIT_READY(executorExitedMessage);
+  AWAIT_READY(statusUpdateMessage);
+
+  // Now notify the framework of the new master.
+  Future<FrameworkRegisteredMessage> frameworkRegisteredMessage2 =
+    FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg);
+
+  AWAIT_READY(frameworkRegisteredMessage2);
+
+  driver.stop();
+  driver.join();
+  Shutdown();
+}
+
+
 TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor)
 {
   Try<PID<Master> > master = StartMaster();

Reply via email to