[mesos] 02/03: Fixed out-of-order processing of terminal status updates in agent.

2019-08-26 Thread abudnik
This is an automated email from the ASF dual-hosted git repository.

abudnik pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4bbb0376cd584a4160a2c5c2f0ac4f3ecaa5e622
Author: Andrei Budnik 
AuthorDate: Tue Aug 20 19:24:44 2019 +0200

Fixed out-of-order processing of terminal status updates in agent.

Previously, Mesos agent could send TASK_FAILED status update on
executor termination while processing of TASK_FINISHED status update
was in progress. Processing of task status updates involves sending
requests to the containerizer, which might finish processing of these
requests out-of-order, e.g. `MesosContainerizer::status`. Also,
the agent does not overwrite status of the terminal status update once
it's stored in the `terminatedTasks`. Hence, there was a race condition
between two terminal status updates.

Note that V1 Executors are not affected by this problem because they
wait for an acknowledgement of the terminal status update by the agent
before terminating.

This patch introduces a new data structure `pendingStatusUpdates`,
which holds a list of status updates that are being processed. This
data structure allows validating the order of processing of status
updates by the agent.

Review: https://reviews.apache.org/r/71343
---
 src/slave/slave.cpp | 62 ++---
 src/slave/slave.hpp |  6 ++
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 50a7d68..8d8cef3 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5727,6 +5727,8 @@ void Slave::statusUpdate(StatusUpdate update, const 
Option& pid)
 
   metrics.valid_status_updates++;
 
+  executor->addPendingTaskStatus(status);
+
   // Before sending update, we need to retrieve the container status
   // if the task reached the executor. For tasks that are queued, we
   // do not need to send the container status and we must
@@ -5938,6 +5940,17 @@ void Slave::___statusUpdate(
   VLOG(1) << "Task status update manager successfully handled status update "
   << update;
 
+  const TaskStatus& status = update.status();
+
+  Executor* executor = nullptr;
+  Framework* framework = getFramework(update.framework_id());
+  if (framework != nullptr) {
+executor = framework->getExecutor(status.task_id());
+if (executor != nullptr) {
+  executor->removePendingTaskStatus(status);
+}
+  }
+
   if (pid == UPID()) {
 return;
   }
@@ -5945,7 +5958,7 @@ void Slave::___statusUpdate(
   StatusUpdateAcknowledgementMessage message;
   message.mutable_framework_id()->MergeFrom(update.framework_id());
   message.mutable_slave_id()->MergeFrom(update.slave_id());
-  message.mutable_task_id()->MergeFrom(update.status().task_id());
+  message.mutable_task_id()->MergeFrom(status.task_id());
   message.set_uuid(update.uuid());
 
   // Task status update manager successfully handled the status update.
@@ -5957,14 +5970,12 @@ void Slave::___statusUpdate(
 send(pid.get(), message);
   } else {
 // Acknowledge the HTTP based executor.
-Framework* framework = getFramework(update.framework_id());
 if (framework == nullptr) {
   LOG(WARNING) << "Ignoring sending acknowledgement for status update "
<< update << " of unknown framework";
   return;
 }
 
-Executor* executor = framework->getExecutor(update.status().task_id());
 if (executor == nullptr) {
   // Refer to the comments in 'statusUpdate()' on when this can
   // happen.
@@ -10520,6 +10531,33 @@ void Executor::recoverTask(const TaskState& state, 
bool recheckpointTask)
 }
 
 
+void Executor::addPendingTaskStatus(const TaskStatus& status)
+{
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+  pendingStatusUpdates[status.task_id()][uuid] = status;
+}
+
+
+void Executor::removePendingTaskStatus(const TaskStatus& status)
+{
+  const TaskID& taskId = status.task_id();
+
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+
+  if (!pendingStatusUpdates.contains(taskId) ||
+  !pendingStatusUpdates[taskId].contains(uuid)) {
+LOG(WARNING) << "Unknown pending status update (uuid: " << uuid << ")";
+return;
+  }
+
+  pendingStatusUpdates[taskId].erase(uuid);
+
+  if (pendingStatusUpdates[taskId].empty()) {
+pendingStatusUpdates.erase(taskId);
+  }
+}
+
+
 Try Executor::updateTaskState(const TaskStatus& status)
 {
   bool terminal = protobuf::isTerminalState(status.state());
@@ -10543,6 +10581,24 @@ Try Executor::updateTaskState(const 
TaskStatus& status)
 task = launchedTasks.at(status.task_id());
 
 if (terminal) {
+  if (pendingStatusUpdates.contains(status.task_id())) {
+auto statusUpdates = pendingStatusUpdates[status.task_id()].values();
+
+auto firstTerminal = std::find_if(
+statusUpdates.begin(),
+statusUpdates.end(),
+   

[mesos] 02/03: Fixed out-of-order processing of terminal status updates in agent.

2019-08-26 Thread abudnik
This is an automated email from the ASF dual-hosted git repository.

abudnik pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit b7dcc984476904d6d17f7bf699295dfa9ac8a66e
Author: Andrei Budnik 
AuthorDate: Tue Aug 20 19:24:44 2019 +0200

Fixed out-of-order processing of terminal status updates in agent.

Previously, Mesos agent could send TASK_FAILED status update on
executor termination while processing of TASK_FINISHED status update
was in progress. Processing of task status updates involves sending
requests to the containerizer, which might finish processing of these
requests out-of-order, e.g. `MesosContainerizer::status`. Also,
the agent does not overwrite status of the terminal status update once
it's stored in the `terminatedTasks`. Hence, there was a race condition
between two terminal status updates.

Note that V1 Executors are not affected by this problem because they
wait for an acknowledgement of the terminal status update by the agent
before terminating.

This patch introduces a new data structure `pendingStatusUpdates`,
which holds a list of status updates that are being processed. This
data structure allows validating the order of processing of status
updates by the agent.

Review: https://reviews.apache.org/r/71343
---
 src/slave/slave.cpp | 62 ++---
 src/slave/slave.hpp |  6 ++
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index edfe3d0..f10aac2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5486,6 +5486,8 @@ void Slave::statusUpdate(StatusUpdate update, const 
Option& pid)
 
   metrics.valid_status_updates++;
 
+  executor->addPendingTaskStatus(status);
+
   // Before sending update, we need to retrieve the container status
   // if the task reached the executor. For tasks that are queued, we
   // do not need to send the container status and we must
@@ -5697,6 +5699,17 @@ void Slave::___statusUpdate(
   VLOG(1) << "Task status update manager successfully handled status update "
   << update;
 
+  const TaskStatus& status = update.status();
+
+  Executor* executor = nullptr;
+  Framework* framework = getFramework(update.framework_id());
+  if (framework != nullptr) {
+executor = framework->getExecutor(status.task_id());
+if (executor != nullptr) {
+  executor->removePendingTaskStatus(status);
+}
+  }
+
   if (pid == UPID()) {
 return;
   }
@@ -5704,7 +5717,7 @@ void Slave::___statusUpdate(
   StatusUpdateAcknowledgementMessage message;
   message.mutable_framework_id()->MergeFrom(update.framework_id());
   message.mutable_slave_id()->MergeFrom(update.slave_id());
-  message.mutable_task_id()->MergeFrom(update.status().task_id());
+  message.mutable_task_id()->MergeFrom(status.task_id());
   message.set_uuid(update.uuid());
 
   // Task status update manager successfully handled the status update.
@@ -5716,14 +5729,12 @@ void Slave::___statusUpdate(
 send(pid.get(), message);
   } else {
 // Acknowledge the HTTP based executor.
-Framework* framework = getFramework(update.framework_id());
 if (framework == nullptr) {
   LOG(WARNING) << "Ignoring sending acknowledgement for status update "
<< update << " of unknown framework";
   return;
 }
 
-Executor* executor = framework->getExecutor(update.status().task_id());
 if (executor == nullptr) {
   // Refer to the comments in 'statusUpdate()' on when this can
   // happen.
@@ -9861,6 +9872,33 @@ void Executor::recoverTask(const TaskState& state, bool 
recheckpointTask)
 }
 
 
+void Executor::addPendingTaskStatus(const TaskStatus& status)
+{
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+  pendingStatusUpdates[status.task_id()][uuid] = status;
+}
+
+
+void Executor::removePendingTaskStatus(const TaskStatus& status)
+{
+  const TaskID& taskId = status.task_id();
+
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+
+  if (!pendingStatusUpdates.contains(taskId) ||
+  !pendingStatusUpdates[taskId].contains(uuid)) {
+LOG(WARNING) << "Unknown pending status update (uuid: " << uuid << ")";
+return;
+  }
+
+  pendingStatusUpdates[taskId].erase(uuid);
+
+  if (pendingStatusUpdates[taskId].empty()) {
+pendingStatusUpdates.erase(taskId);
+  }
+}
+
+
 Try Executor::updateTaskState(const TaskStatus& status)
 {
   bool terminal = protobuf::isTerminalState(status.state());
@@ -9884,6 +9922,24 @@ Try Executor::updateTaskState(const TaskStatus& 
status)
 task = launchedTasks.at(status.task_id());
 
 if (terminal) {
+  if (pendingStatusUpdates.contains(status.task_id())) {
+auto statusUpdates = pendingStatusUpdates[status.task_id()].values();
+
+auto firstTerminal = std::find_if(
+statusUpdates.begin(),
+statusUpdates.end(),
+   

[mesos] 02/03: Fixed out-of-order processing of terminal status updates in agent.

2019-08-26 Thread abudnik
This is an automated email from the ASF dual-hosted git repository.

abudnik pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3ad802ebbe34565a2fa995d834ba4928c20e5e62
Author: Andrei Budnik 
AuthorDate: Tue Aug 20 19:24:44 2019 +0200

Fixed out-of-order processing of terminal status updates in agent.

Previously, Mesos agent could send TASK_FAILED status update on
executor termination while processing of TASK_FINISHED status update
was in progress. Processing of task status updates involves sending
requests to the containerizer, which might finish processing of these
requests out-of-order, e.g. `MesosContainerizer::status`. Also,
the agent does not overwrite status of the terminal status update once
it's stored in the `terminatedTasks`. Hence, there was a race condition
between two terminal status updates.

Note that V1 Executors are not affected by this problem because they
wait for an acknowledgement of the terminal status update by the agent
before terminating.

This patch introduces a new data structure `pendingStatusUpdates`,
which holds a list of status updates that are being processed. This
data structure allows validating the order of processing of status
updates by the agent.

Review: https://reviews.apache.org/r/71343
---
 src/slave/slave.cpp | 62 ++---
 src/slave/slave.hpp |  6 ++
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 176d3fb..0861ac2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5456,6 +5456,8 @@ void Slave::statusUpdate(StatusUpdate update, const 
Option& pid)
 
   metrics.valid_status_updates++;
 
+  executor->addPendingTaskStatus(status);
+
   // Before sending update, we need to retrieve the container status
   // if the task reached the executor. For tasks that are queued, we
   // do not need to send the container status and we must
@@ -5667,6 +5669,17 @@ void Slave::___statusUpdate(
   VLOG(1) << "Task status update manager successfully handled status update "
   << update;
 
+  const TaskStatus& status = update.status();
+
+  Executor* executor = nullptr;
+  Framework* framework = getFramework(update.framework_id());
+  if (framework != nullptr) {
+executor = framework->getExecutor(status.task_id());
+if (executor != nullptr) {
+  executor->removePendingTaskStatus(status);
+}
+  }
+
   if (pid == UPID()) {
 return;
   }
@@ -5674,7 +5687,7 @@ void Slave::___statusUpdate(
   StatusUpdateAcknowledgementMessage message;
   message.mutable_framework_id()->MergeFrom(update.framework_id());
   message.mutable_slave_id()->MergeFrom(update.slave_id());
-  message.mutable_task_id()->MergeFrom(update.status().task_id());
+  message.mutable_task_id()->MergeFrom(status.task_id());
   message.set_uuid(update.uuid());
 
   // Task status update manager successfully handled the status update.
@@ -5686,14 +5699,12 @@ void Slave::___statusUpdate(
 send(pid.get(), message);
   } else {
 // Acknowledge the HTTP based executor.
-Framework* framework = getFramework(update.framework_id());
 if (framework == nullptr) {
   LOG(WARNING) << "Ignoring sending acknowledgement for status update "
<< update << " of unknown framework";
   return;
 }
 
-Executor* executor = framework->getExecutor(update.status().task_id());
 if (executor == nullptr) {
   // Refer to the comments in 'statusUpdate()' on when this can
   // happen.
@@ -9759,6 +9770,33 @@ void Executor::recoverTask(const TaskState& state, bool 
recheckpointTask)
 }
 
 
+void Executor::addPendingTaskStatus(const TaskStatus& status)
+{
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+  pendingStatusUpdates[status.task_id()][uuid] = status;
+}
+
+
+void Executor::removePendingTaskStatus(const TaskStatus& status)
+{
+  const TaskID& taskId = status.task_id();
+
+  auto uuid = id::UUID::fromBytes(status.uuid()).get();
+
+  if (!pendingStatusUpdates.contains(taskId) ||
+  !pendingStatusUpdates[taskId].contains(uuid)) {
+LOG(WARNING) << "Unknown pending status update (uuid: " << uuid << ")";
+return;
+  }
+
+  pendingStatusUpdates[taskId].erase(uuid);
+
+  if (pendingStatusUpdates[taskId].empty()) {
+pendingStatusUpdates.erase(taskId);
+  }
+}
+
+
 Try Executor::updateTaskState(const TaskStatus& status)
 {
   bool terminal = protobuf::isTerminalState(status.state());
@@ -9782,6 +9820,24 @@ Try Executor::updateTaskState(const TaskStatus& 
status)
 task = launchedTasks.at(status.task_id());
 
 if (terminal) {
+  if (pendingStatusUpdates.contains(status.task_id())) {
+auto statusUpdates = pendingStatusUpdates[status.task_id()].values();
+
+auto firstTerminal = std::find_if(
+statusUpdates.begin(),
+statusUpdates.end(),
+