This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f4d9ae  Batch '/state' requests on Master.
2f4d9ae is described below

commit 2f4d9ae0335b988f8a0bdab319ebd02f34519f3b
Author: Alexander Rukletsov <al...@apache.org>
AuthorDate: Thu Jul 19 12:56:46 2018 +0200

    Batch '/state' requests on Master.
    
    With this patch handlers for '/state' requests are not scheduled
    directly after authorization, but are accumulated and then scheduled
    for later parallel processing.
    
    This approach allows, if there are N '/state' requests in the Master's
    mailbox and T is the request response time, to block the Master actor
    only once for time O(T) instead of blocking it for time N*T prior to
    this patch.
    
    This batching technique reduces both the time Master is spending
    answering '/state' requests and the average request responce time
    in presence of multiple requests in the Master's mailbox. However,
    for seldom '/state' requests that don't accumulate in the Master's
    mailbox, the response time might increase due to an added trip
    through the mailbox.
    
    The change preserves the read-your-writes consistency model.
    
    Review: https://reviews.apache.org/r/68132
---
 src/master/http.cpp   | 301 +++++++++++++++++++++++++++++++-------------------
 src/master/master.hpp |  27 ++++-
 2 files changed, 212 insertions(+), 116 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index d43fbd6..e2773ed 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -36,8 +36,10 @@
 
 #include <mesos/v1/master/master.hpp>
 
+#include <process/async.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
+#include <process/future.hpp>
 #include <process/help.hpp>
 #include <process/logging.hpp>
 
@@ -96,6 +98,7 @@ using process::Failure;
 using process::Future;
 using process::HELP;
 using process::Logging;
+using process::Promise;
 using process::TLDR;
 
 using process::http::Accepted;
@@ -2813,7 +2816,7 @@ string Master::Http::STATE_HELP()
 
 Future<Response> Master::Http::state(
     const Request& request,
-    const Option<Principal>& principal) const
+    const Option<Principal>& principal)
 {
   // TODO(greggomann): Remove this check once the `Principal` type is used in
   // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
@@ -2829,150 +2832,218 @@ Future<Response> Master::Http::state(
     return redirect(request);
   }
 
+  // TODO(alexr): De-duplicate response processing when the principal is
+  // identical, e.g., if "bob" asks for state three times in one batch,
+  // ideally we only compute the response for "bob" once since they're all
+  // identical within a principal.
   return ObjectApprovers::create(
       master->authorizer,
       principal,
       {VIEW_ROLE, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_FLAGS})
     .then(defer(
         master->self(),
-        [this, request](const Owned<ObjectApprovers>& approvers) -> Response {
-          // This lambda is consumed before the outer lambda
-          // returns, hence capture by reference is fine here.
-          auto state = [this, &approvers](JSON::ObjectWriter* writer) {
-            writer->field("version", MESOS_VERSION);
+        [this, request](const Owned<ObjectApprovers>& approvers) {
+          return deferStateRequest(request, approvers);
+        }));
+}
 
-            if (build::GIT_SHA.isSome()) {
-              writer->field("git_sha", build::GIT_SHA.get());
-            }
 
-            if (build::GIT_BRANCH.isSome()) {
-              writer->field("git_branch", build::GIT_BRANCH.get());
-            }
+Future<Response> Master::Http::deferStateRequest(
+    const Request& request,
+    const Owned<ObjectApprovers>& approvers)
+{
+  bool scheduleBatch = batchedStateRequests.empty();
+
+  // Add an element to the batched state requests.
+  Promise<Response> promise;
+  Future<Response> future = promise.future();
+  batchedStateRequests.push_back(
+      BatchedStateRequest{request, approvers, std::move(promise)});
+
+  // Schedule processing of batched requests if not yet scheduled.
+  if (scheduleBatch) {
+    dispatch(master->self(), [this]() {
+      processStateRequestsBatch();
+    });
+  }
 
-            if (build::GIT_TAG.isSome()) {
-              writer->field("git_tag", build::GIT_TAG.get());
-            }
+  return future;
+}
 
-            writer->field("build_date", build::DATE);
-            writer->field("build_time", build::TIME);
-            writer->field("build_user", build::USER);
-            writer->field("start_time", master->startTime.secs());
 
-            if (master->electedTime.isSome()) {
-              writer->field("elected_time", master->electedTime->secs());
-            }
+void Master::Http::processStateRequestsBatch()
+{
+  CHECK(!batchedStateRequests.empty())
+    << "Bug in state batching logic: No requests to process";
+
+  // This lambda is consumed before the enclosed function returns,
+  // hence capturing `this` is fine here.
+  auto produceResponse = [this](
+      const Request& request,
+      const Owned<ObjectApprovers>& approvers) -> Response {
+    // This lambda is consumed before the outer lambda returns,
+    // hence capturing a reference is fine here.
+    auto calculateState = [this, &approvers](JSON::ObjectWriter* writer) {
+      writer->field("version", MESOS_VERSION);
+
+      if (build::GIT_SHA.isSome()) {
+        writer->field("git_sha", build::GIT_SHA.get());
+      }
 
-            writer->field("id", master->info().id());
-            writer->field("pid", string(master->self()));
-            writer->field("hostname", master->info().hostname());
-            writer->field("capabilities", master->info().capabilities());
-            writer->field("activated_slaves", master->_slaves_active());
-            writer->field("deactivated_slaves", master->_slaves_inactive());
-            writer->field("unreachable_slaves", master->_slaves_unreachable());
+      if (build::GIT_BRANCH.isSome()) {
+        writer->field("git_branch", build::GIT_BRANCH.get());
+      }
 
-            if (master->info().has_domain()) {
-              writer->field("domain", master->info().domain());
-            }
+      if (build::GIT_TAG.isSome()) {
+        writer->field("git_tag", build::GIT_TAG.get());
+      }
 
-            // TODO(haosdent): Deprecated this in favor of `leader_info` below.
-            if (master->leader.isSome()) {
-              writer->field("leader", master->leader->pid());
-            }
+      writer->field("build_date", build::DATE);
+      writer->field("build_time", build::TIME);
+      writer->field("build_user", build::USER);
+      writer->field("start_time", master->startTime.secs());
 
-            if (master->leader.isSome()) {
-              writer->field("leader_info", [this](JSON::ObjectWriter* writer) {
-                json(writer, master->leader.get());
-              });
-            }
+      if (master->electedTime.isSome()) {
+        writer->field("elected_time", master->electedTime->secs());
+      }
 
-            if (approvers->approved<VIEW_FLAGS>()) {
-              if (master->flags.cluster.isSome()) {
-                writer->field("cluster", master->flags.cluster.get());
-              }
+      writer->field("id", master->info().id());
+      writer->field("pid", string(master->self()));
+      writer->field("hostname", master->info().hostname());
+      writer->field("capabilities", master->info().capabilities());
+      writer->field("activated_slaves", master->_slaves_active());
+      writer->field("deactivated_slaves", master->_slaves_inactive());
+      writer->field("unreachable_slaves", master->_slaves_unreachable());
 
-              if (master->flags.log_dir.isSome()) {
-                writer->field("log_dir", master->flags.log_dir.get());
-              }
+      if (master->info().has_domain()) {
+        writer->field("domain", master->info().domain());
+      }
 
-              if (master->flags.external_log_file.isSome()) {
-                writer->field("external_log_file",
-                              master->flags.external_log_file.get());
+      // TODO(haosdent): Deprecated this in favor of `leader_info` below.
+      if (master->leader.isSome()) {
+        writer->field("leader", master->leader->pid());
+      }
+
+      if (master->leader.isSome()) {
+        writer->field("leader_info", [this](JSON::ObjectWriter* writer) {
+          json(writer, master->leader.get());
+        });
+      }
+
+      if (approvers->approved<VIEW_FLAGS>()) {
+        if (master->flags.cluster.isSome()) {
+          writer->field("cluster", master->flags.cluster.get());
+        }
+
+        if (master->flags.log_dir.isSome()) {
+          writer->field("log_dir", master->flags.log_dir.get());
+        }
+
+        if (master->flags.external_log_file.isSome()) {
+          writer->field("external_log_file",
+                        master->flags.external_log_file.get());
+        }
+
+        writer->field("flags", [this](JSON::ObjectWriter* writer) {
+            foreachvalue (const flags::Flag& flag, master->flags) {
+              Option<string> value = flag.stringify(master->flags);
+              if (value.isSome()) {
+                writer->field(flag.effective_name().value, value.get());
               }
+            }
+          });
+      }
 
-              writer->field("flags", [this](JSON::ObjectWriter* writer) {
-                  foreachvalue (const flags::Flag& flag, master->flags) {
-                    Option<string> value = flag.stringify(master->flags);
-                    if (value.isSome()) {
-                      writer->field(flag.effective_name().value, value.get());
-                    }
-                  }
-                });
+      // Model all of the registered slaves.
+      writer->field(
+          "slaves",
+          [this, &approvers](JSON::ArrayWriter* writer) {
+            foreachvalue (Slave* slave, master->slaves.registered) {
+              writer->element(SlaveWriter(*slave, approvers));
             }
+          });
 
-            // Model all of the registered slaves.
-            writer->field(
-                "slaves",
-                [this, &approvers](JSON::ArrayWriter* writer) {
-                  foreachvalue (Slave* slave, master->slaves.registered) {
-                    writer->element(SlaveWriter(*slave, approvers));
-                  }
-                });
+      // Model all of the recovered slaves.
+      writer->field(
+          "recovered_slaves",
+          [this](JSON::ArrayWriter* writer) {
+            foreachvalue (
+                const SlaveInfo& slaveInfo, master->slaves.recovered) {
+              writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
+                json(writer, slaveInfo);
+              });
+            }
+          });
 
-            // Model all of the recovered slaves.
-            writer->field(
-                "recovered_slaves",
-                [this](JSON::ArrayWriter* writer) {
-                  foreachvalue (
-                      const SlaveInfo& slaveInfo, master->slaves.recovered) {
-                    writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
-                      json(writer, slaveInfo);
-                    });
-                  }
-                });
+      // Model all of the frameworks.
+      writer->field(
+          "frameworks",
+          [this, &approvers](JSON::ArrayWriter* writer) {
+            foreachvalue (
+                Framework* framework, master->frameworks.registered) {
+              // Skip unauthorized frameworks.
+              if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+                continue;
+              }
 
-            // Model all of the frameworks.
-            writer->field(
-                "frameworks",
-                [this, &approvers](JSON::ArrayWriter* writer) {
-                  foreachvalue (
-                      Framework* framework, master->frameworks.registered) {
-                    // Skip unauthorized frameworks.
-                    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) 
{
-                      continue;
-                    }
+              writer->element(FullFrameworkWriter(approvers, framework));
+            }
+          });
 
-                    writer->element(FullFrameworkWriter(approvers, framework));
-                  }
-                });
+      // Model all of the completed frameworks.
+      writer->field(
+          "completed_frameworks",
+          [this, &approvers](JSON::ArrayWriter* writer) {
+            foreachvalue (
+                const Owned<Framework>& framework,
+                master->frameworks.completed) {
+              // Skip unauthorized frameworks.
+              if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+                continue;
+              }
 
-            // Model all of the completed frameworks.
-            writer->field(
-                "completed_frameworks",
-                [this, &approvers](JSON::ArrayWriter* writer) {
-                  foreachvalue (
-                      const Owned<Framework>& framework,
-                      master->frameworks.completed) {
-                    // Skip unauthorized frameworks.
-                    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) 
{
-                      continue;
-                    }
+              writer->element(
+                  FullFrameworkWriter(approvers, framework.get()));
+            }
+          });
 
-                    writer->element(
-                        FullFrameworkWriter(approvers, framework.get()));
-                  }
-                });
+      // Orphan tasks are no longer possible. We emit an empty array
+      // for the sake of backward compatibility.
+      writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
 
-            // Orphan tasks are no longer possible. We emit an empty array
-            // for the sake of backward compatibility.
-            writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
+      // Unregistered frameworks are no longer possible. We emit an
+      // empty array for the sake of backward compatibility.
+      writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
+    };
 
-            // Unregistered frameworks are no longer possible. We emit an
-            // empty array for the sake of backward compatibility.
-            writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) 
{});
-          };
+    return OK(jsonify(calculateState), request.url.query.get("jsonp"));
+  };
 
-          return OK(jsonify(state), request.url.query.get("jsonp"));
-        }));
+  // Produce the responses in parallel.
+  //
+  // TODO(alexr): Consider abstracting this into `parallel_async` or
+  // `foreach_parallel`, see MESOS-8587.
+  //
+  // TODO(alexr): Consider moving `BatchedStateRequest`'s fields into
+  // `process::async` once it supports moving.
+  foreach (BatchedStateRequest& request, batchedStateRequests) {
+    request.promise.associate(process::async(
+        produceResponse, request.request, request.approvers));
+  }
+
+  // Block the master actor until all workers have generated state responses.
+  // It is crucial not to allow the master actor to continue and possibly
+  // modify its state while a worker is reading it.
+  //
+  // NOTE: There is the potential for deadlock since we are blocking 1 working
+  // thread here, see MESOS-8256.
+  vector<Future<Response>> responses;
+  foreach (const BatchedStateRequest& request, batchedStateRequests) {
+    responses.push_back(request.promise.future());
+  }
+  process::await(responses).await();
+
+  batchedStateRequests.clear();
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 209b998..85ef14c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -44,6 +44,7 @@
 
 #include <mesos/scheduler/scheduler.hpp>
 
+#include <process/future.hpp>
 #include <process/limiter.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
@@ -1461,10 +1462,12 @@ private:
             principal) const;
 
     // /master/state
+    //
+    // NOTE: Requests to this endpoint are batched.
     process::Future<process::http::Response> state(
         const process::http::Request& request,
         const Option<process::http::authentication::Principal>&
-            principal) const;
+            principal);
 
     // /master/state-summary
     process::Future<process::http::Response> stateSummary(
@@ -1552,6 +1555,17 @@ private:
         const Option<process::http::authentication::Principal>&
             principal) const;
 
+    // A continuation for `state()`. Schedules request processing in a batch
+    // of other '/state' requests.
+    process::Future<process::http::Response> deferStateRequest(
+        const process::http::Request& request,
+        const process::Owned<ObjectApprovers>& approvers);
+
+    // A helper that responds to batched, i.e., accumulated, '/state'
+    // requests in parallel, i.e., a continuation for `deferStateRequest()`.
+    // See also `BatchedStateRequest`.
+    void processStateRequestsBatch();
+
     process::Future<std::vector<const Task*>> _tasks(
         const size_t limit,
         const size_t offset,
@@ -1830,6 +1844,17 @@ private:
     // NOTE: The weights specific pieces of the Operator API are factored
     // out into this separate class.
     WeightsHandler weightsHandler;
+
+    // TODO(alexr): Consider adding a `type` or `handler` field to expand
+    // batching to other heavy read-only requests, e.g., '/state-summary'.
+    struct BatchedStateRequest
+    {
+      process::http::Request request;
+      process::Owned<ObjectApprovers> approvers;
+      process::Promise<process::http::Response> promise;
+    };
+
+    std::vector<BatchedStateRequest> batchedStateRequests;
   };
 
   Master(const Master&);              // No copying.

Reply via email to