Stopped shutting down the whole default executor on task launch failure.

The default executor would be completely shutdown on a
`LAUNCH_NESTED_CONTAINER` failure.

This patch makes it kill the affected task group instead of shutting
down and killing all task groups.

Review: https://reviews.apache.org/r/65551/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5afcbec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5afcbec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5afcbec

Branch: refs/heads/1.5.x
Commit: e5afcbec307c10416f885fedc450f023273371e4
Parents: 24b8b43
Author: Gaston Kleiman <gas...@mesosphere.io>
Authored: Wed Feb 14 14:35:11 2018 +0800
Committer: Qian Zhang <zhq527...@gmail.com>
Committed: Wed Feb 14 21:03:19 2018 +0800

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 167 ++++++++++++++++++++-------------
 1 file changed, 104 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e5afcbec/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index ea9b2d4..16977b5 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -108,6 +108,9 @@ private:
     // `WAIT_NESTED_CONTAINER` call has not been established yet.
     Option<Connection> waiting;
 
+    // Error returned by the agent while trying to launch the container.
+    Option<string> launchError;
+
     // TODO(bennoe): Create a real state machine instead of adding
     // more and more ad-hoc boolean values.
 
@@ -420,6 +423,7 @@ protected:
         None(),
         None(),
         None(),
+        None(),
         false,
         false,
         false,
@@ -522,28 +526,32 @@ protected:
       return;
     }
 
-    // Check if we received a 200 OK response for all the
-    // `LAUNCH_NESTED_CONTAINER` calls. Shutdown the executor
-    // if this is not the case.
-    foreach (const Response& response, responses.get()) {
-      if (response.code != process::http::Status::OK) {
-        LOG(ERROR) << "Received '" << response.status << "' ("
-                   << response.body << ") while launching child container";
-        _shutdown();
-        return;
-      }
-    }
-
     CHECK(launched);
     CHECK_EQ(containerIds.size(), (size_t) taskGroup.tasks().size());
+    CHECK_EQ(containerIds.size(), responses->size());
 
-    size_t index = 0;
+    int index = 0;
+    auto responseIterator = responses->begin();
     foreach (const ContainerID& containerId, containerIds) {
       const TaskInfo& task = taskGroup.tasks().Get(index++);
       const TaskID& taskId = task.task_id();
+      const Response& response = *(responseIterator++);
 
       CHECK(containers.contains(taskId));
-      containers.at(taskId)->launched = true;
+      Container* container = containers.at(taskId).get();
+
+      // Check if we received a 200 OK response for the
+      // `LAUNCH_NESTED_CONTAINER` call. Skip the rest of the container
+      // initialization if this is not the case.
+      if (response.code != process::http::Status::OK) {
+        LOG(ERROR) << "Received '" << response.status << "' (" << response.body
+                   << ") while launching child container " << containerId
+                   << " of task '" << taskId << "'";
+        container->launchError = response.body;
+        continue;
+      }
+
+      container->launched = true;
 
       if (task.has_check()) {
         Try<Owned<checks::Checker>> checker =
@@ -563,7 +571,7 @@ protected:
           return;
         }
 
-        containers.at(taskId)->checker = checker.get();
+        container->checker = checker.get();
       }
 
       if (task.has_health_check()) {
@@ -585,7 +593,7 @@ protected:
           return;
         }
 
-        containers.at(taskId)->healthChecker = healthChecker.get();
+        container->healthChecker = healthChecker.get();
       }
 
       // Currently, the Mesos agent does not expose the mapping from
@@ -611,13 +619,8 @@ protected:
                    << containerId << " of task '" << taskId << "' due to: "
                    << symlink.error();
       }
-    }
 
-    // Send a TASK_RUNNING status update now that the task group has
-    // been successfully launched.
-    foreach (const TaskInfo& task, taskGroup.tasks()) {
-      const TaskStatus status = createTaskStatus(task.task_id(), TASK_RUNNING);
-      forward(status);
+      forward(createTaskStatus(task.task_id(), TASK_RUNNING));
     }
 
     auto taskIds = [&taskGroup]() {
@@ -629,7 +632,7 @@ protected:
     };
 
     LOG(INFO)
-      << "Successfully launched tasks "
+      << "Finished launching tasks "
       << stringify(taskIds()) << " in child containers "
       << stringify(containerIds);
 
@@ -786,9 +789,12 @@ protected:
       return;
     }
 
-    // Check if we receive a 200 OK response for the `WAIT_NESTED_CONTAINER`
-    // calls. Shutdown the executor otherwise.
-    if (response->code != process::http::Status::OK) {
+    // Shutdown the executor if the agent responded to the
+    // `WAIT_NESTED_CONTAINER` call with an error. Note that several race
+    // conditions can cause a 404 NOT FOUND response, which shouldn't be
+    // treated as an error.
+    if (response->code != process::http::Status::NOT_FOUND &&
+        response->code != process::http::Status::OK) {
       LOG(ERROR) << "Received '" << response->status << "' ("
                  << response->body << ") waiting on child container "
                  << container->containerId << " of task '" << taskId << "'";
@@ -796,10 +802,6 @@ protected:
       return;
     }
 
-    Try<agent::Response> waitResponse =
-      deserialize<agent::Response>(contentType, response->body);
-    CHECK_SOME(waitResponse);
-
     // If the task is checked, pause the associated checker to avoid
     // sending check updates after a terminal status update.
     if (container->checker.isSome()) {
@@ -821,52 +823,82 @@ protected:
     Option<TaskStatus::Reason> reason;
     Option<TaskResourceLimitation> limitation;
 
-    if (!waitResponse->wait_nested_container().has_exit_status()) {
-      taskState = TASK_FAILED;
-      message = "Command terminated with unknown status";
-    } else {
-      int status = waitResponse->wait_nested_container().exit_status();
-
-      CHECK(WIFEXITED(status) || WIFSIGNALED(status))
-        << "Unexpected wait status " << status;
+    if (response->code == process::http::Status::NOT_FOUND) {
+      // The agent can respond with 404 NOT FOUND due to a failed container
+      // launch or due to a race condition.
 
       if (container->killing) {
         // Send TASK_KILLED if the task was killed as a result of
         // `killTask()` or `shutdown()`.
         taskState = TASK_KILLED;
-      } else if (WSUCCEEDED(status)) {
-        taskState = TASK_FINISHED;
+      } else if (container->launchError.isSome()) {
+        // Send TASK_FAILED if we know that `LAUNCH_NESTED_CONTAINER` returned
+        // an error.
+        taskState = TASK_FAILED;
+        message = container->launchError;
       } else {
+        // We don't know exactly why `WAIT_NESTED_CONTAINER` returned 404 NOT
+        // FOUND, so we'll assume that the task failed.
         taskState = TASK_FAILED;
+        message = "Unable to retrieve command's termination information";
       }
+    } else {
+      Try<agent::Response> waitResponse =
+        deserialize<agent::Response>(contentType, response->body);
+      CHECK_SOME(waitResponse);
 
-      message = "Command " + WSTRINGIFY(status);
-    }
+      if (!waitResponse->wait_nested_container().has_exit_status()) {
+        taskState = TASK_FAILED;
 
-    // Note that we always prefer the task state and reason from the
-    // agent response over what we can determine ourselves because
-    // in general, the agent has more specific information about why
-    // the container exited (e.g. this might be a container resource
-    // limitation).
-    if (waitResponse->wait_nested_container().has_state()) {
-      taskState = waitResponse->wait_nested_container().state();
-    }
+        if (container->launchError.isSome()) {
+          message = container->launchError;
+        } else {
+          message = "Command terminated with unknown status";
+        }
+      } else {
+        int status = waitResponse->wait_nested_container().exit_status();
+
+        CHECK(WIFEXITED(status) || WIFSIGNALED(status))
+          << "Unexpected wait status " << status;
+
+        if (container->killing) {
+          // Send TASK_KILLED if the task was killed as a result of
+          // `killTask()` or `shutdown()`.
+          taskState = TASK_KILLED;
+        } else if (WSUCCEEDED(status)) {
+          taskState = TASK_FINISHED;
+        } else {
+          taskState = TASK_FAILED;
+        }
 
-    if (waitResponse->wait_nested_container().has_reason()) {
-      reason = waitResponse->wait_nested_container().reason();
-    }
+        message = "Command " + WSTRINGIFY(status);
+      }
 
-    if (waitResponse->wait_nested_container().has_message()) {
-      if (message.isSome()) {
-        message->append(
-            ": " +  waitResponse->wait_nested_container().message());
-      } else {
-        message = waitResponse->wait_nested_container().message();
+      // Note that we always prefer the task state and reason from the
+      // agent response over what we can determine ourselves because
+      // in general, the agent has more specific information about why
+      // the container exited (e.g. this might be a container resource
+      // limitation).
+      if (waitResponse->wait_nested_container().has_state()) {
+        taskState = waitResponse->wait_nested_container().state();
       }
-    }
 
-    if (waitResponse->wait_nested_container().has_limitation()) {
-      limitation = waitResponse->wait_nested_container().limitation();
+      if (waitResponse->wait_nested_container().has_reason()) {
+        reason = waitResponse->wait_nested_container().reason();
+      }
+
+      if (waitResponse->wait_nested_container().has_message()) {
+        if (message.isSome()) {
+          message->append(
+              ": " +  waitResponse->wait_nested_container().message());
+        } else {
+          message = waitResponse->wait_nested_container().message();
+        }
+      }
+
+      if (waitResponse->wait_nested_container().has_limitation()) {
+        limitation = waitResponse->wait_nested_container().limitation();
+      }
     }
 
     TaskStatus taskStatus = createTaskStatus(
@@ -877,6 +909,9 @@ protected:
         limitation);
 
     // Indicate that a task has been unhealthy upon termination.
+    //
+    // TODO(gkleiman): We should do this if this task or another task that
+    // belongs to the same task group is unhealthy. See MESOS-8543.
     if (unhealthy) {
       // TODO(abudnik): Consider specifying appropriate status update reason,
       // saying that the task was killed due to a failing health check.
@@ -1032,6 +1067,12 @@ protected:
   {
     CHECK_EQ(SUBSCRIBED, state);
 
+    if (!container->launched) {
+      // We can get here if we're killing a task group for which multiple
+      // containers failed to launch.
+      return Nothing();
+    }
+
     CHECK(!container->killing);
     container->killing = true;
 
@@ -1438,7 +1479,7 @@ private:
 
     CHECK_EQ(SUBSCRIBED, state);
     CHECK_SOME(connectionId);
-    CHECK(containers.contains(taskId) && containers.at(taskId)->launched);
+    CHECK(containers.contains(taskId));
 
     const Owned<Container>& container = containers.at(taskId);
 

Reply via email to