Repository: mesos
Updated Branches:
  refs/heads/master de06435a6 -> 9cb85e955


Updated master ACCEPT handler to disallow offer operation feedback.

This patch updates the master's ACCEPT call code path to fail
offer operations when their `id` field is set. Since protobufs
have already been updated for offer operation feedback, but the
feature is not fully implemented, we will disallow the setting
of this field for now.

Review: https://reviews.apache.org/r/64142/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/328c1c11
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/328c1c11
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/328c1c11

Branch: refs/heads/master
Commit: 328c1c11690dc112c4af4a5cc79ca0d688ebfb19
Parents: de06435
Author: Greg Mann <g...@mesosphere.io>
Authored: Thu Dec 7 11:35:55 2017 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Thu Dec 7 11:35:55 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 113 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 81 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/328c1c11/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2fd66c0..c11713f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4057,9 +4057,39 @@ void Master::accept(
   // If a RESERVE, UNRESERVE, CREATE, or DESTROY operation
   // contains invalid resources, we just drop the operation.
   //
-  // If a LAUNCH or LAUNCH_GROUP operation contains invalid
-  // resources, we send a TASK_ERROR status update per task.
+  // If a LAUNCH or LAUNCH_GROUP operation contains invalid resources,
+  // we drop the operation and send a TASK_ERROR status update per task.
   {
+    // Used to send TASK_ERROR status updates for tasks in invalid LAUNCH
+    // and LAUNCH_GROUP operations. Note that we don't need to recover
+    // the resources here because we always continue onto `_accept`
+    // which recovers the unused resources at the end.
+    //
+    // TODO(mpark): Consider pulling this out in a more reusable manner.
+    auto sendStatusUpdates = [&](
+        const RepeatedPtrField<TaskInfo>& tasks,
+        TaskStatus::Reason reason,
+        const string& message) {
+      foreach (const TaskInfo& task, tasks) {
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+            framework->id(),
+            task.slave_id(),
+            task.task_id(),
+            TASK_ERROR,
+            TaskStatus::SOURCE_MASTER,
+            None(),
+            message,
+            reason);
+
+        metrics->tasks_error++;
+
+        metrics->incrementTasksStates(
+            TASK_ERROR, TaskStatus::SOURCE_MASTER, reason);
+
+        forward(update, UPID(), framework);
+      }
+    };
+
     // We move out the `accept.operations`, and re-insert the operations
     // with the resources validated and upgraded.
     RepeatedPtrField<Offer::Operation> operations = accept.operations();
@@ -4068,34 +4098,6 @@ void Master::accept(
     foreach (Offer::Operation& operation, operations) {
       Option<Error> error = validateAndNormalizeResources(&operation);
       if (error.isSome()) {
-        // We send TASK_ERROR status updates for tasks in an invalid LAUNCH
-        // and LAUNCH_GROUP operations. Note that we don't need to recover
-        // the resources here because we always continue onto `_accept`
-        // which recovers the unused resources at the end.
-        // TODO(mpark): Consider pulling this out in a more reusable manner.
-        auto sendStatusUpdates = [&](
-            const RepeatedPtrField<TaskInfo>& tasks,
-            TaskStatus::Reason reason) {
-          foreach (const TaskInfo& task, tasks) {
-            const StatusUpdate& update = protobuf::createStatusUpdate(
-                framework->id(),
-                task.slave_id(),
-                task.task_id(),
-                TASK_ERROR,
-                TaskStatus::SOURCE_MASTER,
-                None(),
-                error->message,
-                reason);
-
-            metrics->tasks_error++;
-
-            metrics->incrementTasksStates(
-                TASK_ERROR, TaskStatus::SOURCE_MASTER, reason);
-
-            forward(update, UPID(), framework);
-          }
-        };
-
         switch (operation.type()) {
           case Offer::Operation::RESERVE:
           case Offer::Operation::UNRESERVE:
@@ -4111,17 +4113,64 @@ void Master::accept(
           case Offer::Operation::LAUNCH: {
             sendStatusUpdates(
                 operation.launch().task_infos(),
-                TaskStatus::REASON_TASK_INVALID);
+                TaskStatus::REASON_TASK_INVALID,
+                error->message);
 
             break;
           }
           case Offer::Operation::LAUNCH_GROUP: {
             sendStatusUpdates(
                 operation.launch_group().task_group().tasks(),
-                TaskStatus::REASON_TASK_GROUP_INVALID);
+                TaskStatus::REASON_TASK_GROUP_INVALID,
+                error->message);
+
+            break;
+          }
+          case Offer::Operation::UNKNOWN: {
+            LOG(WARNING) << "Ignoring unknown offer operation";
+            break;
+          }
+        }
+
+        continue;
+      } else if (operation.has_id()) {
+        // If any operation has the `id` field set we drop it, and in the case
+        // of LAUNCH or LAUNCH_GROUP we send task status updates as well.
+        //
+        // TODO(greggomann): Remove this once offer operation feedback is
+        // implemented. See MESOS-8054.
+        const string message =
+          "The `id` field was set on this operation, but offer operation"
+          " status updates are not yet supported";
+
+        switch (operation.type()) {
+          case Offer::Operation::LAUNCH: {
+            sendStatusUpdates(
+                operation.launch().task_infos(),
+                TaskStatus::REASON_TASK_INVALID,
+                message);
 
             break;
           }
+          case Offer::Operation::LAUNCH_GROUP: {
+            sendStatusUpdates(
+                operation.launch_group().task_group().tasks(),
+                TaskStatus::REASON_TASK_GROUP_INVALID,
+                message);
+
+            break;
+          }
+          case Offer::Operation::RESERVE:
+          case Offer::Operation::UNRESERVE:
+          case Offer::Operation::CREATE:
+          case Offer::Operation::DESTROY:
+          case Offer::Operation::CREATE_VOLUME:
+          case Offer::Operation::DESTROY_VOLUME:
+          case Offer::Operation::CREATE_BLOCK:
+          case Offer::Operation::DESTROY_BLOCK: {
+            drop(framework, operation, message);
+            break;
+          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown offer operation";
             break;

Reply via email to