This is an automated email from the ASF dual-hosted git repository.

abudnik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f0be23765531b05661ed7f1b124faf96744aa80b
Author: Andrei Budnik <abud...@apache.org>
AuthorDate: Tue Aug 20 19:24:44 2019 +0200

    Fixed out-of-order processing of terminal status updates in agent.
    
    Previously, Mesos agent could send TASK_FAILED status update on
    executor termination while processing of TASK_FINISHED status update
    was in progress. Processing of task status updates involves sending
    requests to the containerizer, which might finish processing of these
    requests out-of-order, e.g. `MesosContainerizer::status`. Also,
    the agent does not overwrite status of the terminal status update once
    it's stored in the `terminatedTasks`. Hence, there was a race condition
    between two terminal status updates.
    
    Note that V1 Executors are not affected by this problem because they
    wait for an acknowledgement of the terminal status update by the agent
    before terminating.
    
    This patch introduces a new data structure `pendingStatusUpdates`,
    which holds a list of status updates that are being processed. This
    data structure allows validating the order of processing of status
    updates by the agent.
    
    Review: https://reviews.apache.org/r/71343
---
 src/slave/slave.cpp | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++---
 src/slave/slave.hpp |  6 ++++++
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 45f1584..4e93656 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5947,6 +5947,8 @@ void Slave::statusUpdate(StatusUpdate update, const 
Option<UPID>& pid)
 
   metrics.valid_status_updates++;
 
+  executor->addPendingTaskStatus(status);
+
   // Before sending update, we need to retrieve the container status
   // if the task reached the executor. For tasks that are queued, we
   // do not need to send the container status and we must
@@ -6158,6 +6160,17 @@ void Slave::___statusUpdate(
   VLOG(1) << "Task status update manager successfully handled status update "
           << update;
 
+  const TaskStatus& status = update.status();
+
+  Executor* executor = nullptr;
+  Framework* framework = getFramework(update.framework_id());
+  if (framework != nullptr) {
+    executor = framework->getExecutor(status.task_id());
+    if (executor != nullptr) {
+      executor->removePendingTaskStatus(status);
+    }
+  }
+
   if (pid == UPID()) {
     return;
   }
@@ -6165,7 +6178,7 @@ void Slave::___statusUpdate(
   StatusUpdateAcknowledgementMessage message;
   message.mutable_framework_id()->MergeFrom(update.framework_id());
   message.mutable_slave_id()->MergeFrom(update.slave_id());
-  message.mutable_task_id()->MergeFrom(update.status().task_id());
+  message.mutable_task_id()->MergeFrom(status.task_id());
   message.set_uuid(update.uuid());
 
   // Task status update manager successfully handled the status update.
@@ -6177,14 +6190,12 @@ void Slave::___statusUpdate(
     send(pid.get(), message);
   } else {
     // Acknowledge the HTTP based executor.
-    Framework* framework = getFramework(update.framework_id());
     if (framework == nullptr) {
       LOG(WARNING) << "Ignoring sending acknowledgement for status update "
                    << update << " of unknown framework";
       return;
     }
 
-    Executor* executor = framework->getExecutor(update.status().task_id());
     if (executor == nullptr) {
       // Refer to the comments in 'statusUpdate()' on when this can
       // happen.
@@ -10795,6 +10806,33 @@ void Executor::recoverTask(const TaskState& state, 
bool recheckpointTask)
 }
 
 
+void Executor::addPendingTaskStatus(const TaskStatus& status)
+{
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+  pendingStatusUpdates[status.task_id()][uuid] = status;
+}
+
+
+void Executor::removePendingTaskStatus(const TaskStatus& status)
+{
+  const TaskID& taskId = status.task_id();
+
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+
+  if (!pendingStatusUpdates.contains(taskId) ||
+      !pendingStatusUpdates[taskId].contains(uuid)) {
+    LOG(WARNING) << "Unknown pending status update (uuid: " << uuid << ")";
+    return;
+  }
+
+  pendingStatusUpdates[taskId].erase(uuid);
+
+  if (pendingStatusUpdates[taskId].empty()) {
+    pendingStatusUpdates.erase(taskId);
+  }
+}
+
+
 Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
 {
   bool terminal = protobuf::isTerminalState(status.state());
@@ -10818,6 +10856,24 @@ Try<Nothing> Executor::updateTaskState(const 
TaskStatus& status)
     task = launchedTasks.at(status.task_id());
 
     if (terminal) {
+      if (pendingStatusUpdates.contains(status.task_id())) {
+        auto statusUpdates = pendingStatusUpdates[status.task_id()].values();
+
+        auto firstTerminal = std::find_if(
+            statusUpdates.begin(),
+            statusUpdates.end(),
+            [](const TaskStatus& status) {
+              return protobuf::isTerminalState(status.state());
+            });
+
+        CHECK(firstTerminal != statusUpdates.end());
+
+        if (firstTerminal->uuid() != status.uuid()) {
+          return Error("Unexpected terminal status update after first status"
+                       " update " + stringify(firstTerminal->state()));
+        }
+      }
+
       launchedTasks.erase(taskId);
     }
   } else if (terminatedTasks.contains(taskId)) {
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a17bbee..77b5bc0 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -955,6 +955,9 @@ public:
 
   void recoverTask(const state::TaskState& state, bool recheckpointTask);
 
+  void addPendingTaskStatus(const TaskStatus& status);
+  void removePendingTaskStatus(const TaskStatus& status);
+
   Try<Nothing> updateTaskState(const TaskStatus& status);
 
   // Returns true if there are any queued/launched/terminated tasks.
@@ -1099,6 +1102,9 @@ public:
   // non-terminal tasks.
   Option<mesos::slave::ContainerTermination> pendingTermination;
 
+  // Task status updates that are being processed by the agent.
+  hashmap<TaskID, LinkedHashMap<id::UUID, TaskStatus>> pendingStatusUpdates;
+
 private:
   Executor(const Executor&) = delete;
   Executor& operator=(const Executor&) = delete;

Reply via email to