This is an automated email from the ASF dual-hosted git repository. alexr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push: new 2f4d9ae Batch '/state' requests on Master. 2f4d9ae is described below commit 2f4d9ae0335b988f8a0bdab319ebd02f34519f3b Author: Alexander Rukletsov <al...@apache.org> AuthorDate: Thu Jul 19 12:56:46 2018 +0200 Batch '/state' requests on Master. With this patch handlers for '/state' requests are not scheduled directly after authorization, but are accumulated and then scheduled for later parallel processing. This approach allows, if there are N '/state' requests in the Master's mailbox and T is the request response time, to block the Master actor only once for time O(T) instead of blocking it for time N*T prior to this patch. This batching technique reduces both the time Master is spending answering '/state' requests and the average request responce time in presence of multiple requests in the Master's mailbox. However, for seldom '/state' requests that don't accumulate in the Master's mailbox, the response time might increase due to an added trip through the mailbox. The change preserves the read-your-writes consistency model. Review: https://reviews.apache.org/r/68132 --- src/master/http.cpp | 301 +++++++++++++++++++++++++++++++------------------- src/master/master.hpp | 27 ++++- 2 files changed, 212 insertions(+), 116 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index d43fbd6..e2773ed 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -36,8 +36,10 @@ #include <mesos/v1/master/master.hpp> +#include <process/async.hpp> #include <process/collect.hpp> #include <process/defer.hpp> +#include <process/future.hpp> #include <process/help.hpp> #include <process/logging.hpp> @@ -96,6 +98,7 @@ using process::Failure; using process::Future; using process::HELP; using process::Logging; +using process::Promise; using process::TLDR; using process::http::Accepted; @@ -2813,7 +2816,7 @@ string Master::Http::STATE_HELP() Future<Response> Master::Http::state( const Request& request, - const Option<Principal>& principal) const + const Option<Principal>& principal) { // TODO(greggomann): Remove this check once the `Principal` type is used in // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map. @@ -2829,150 +2832,218 @@ Future<Response> Master::Http::state( return redirect(request); } + // TODO(alexr): De-duplicate response processing when the principal is + // identical, e.g., if "bob" asks for state three times in one batch, + // ideally we only compute the response for "bob" once since they're all + // identical within a principal. return ObjectApprovers::create( master->authorizer, principal, {VIEW_ROLE, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_FLAGS}) .then(defer( master->self(), - [this, request](const Owned<ObjectApprovers>& approvers) -> Response { - // This lambda is consumed before the outer lambda - // returns, hence capture by reference is fine here. - auto state = [this, &approvers](JSON::ObjectWriter* writer) { - writer->field("version", MESOS_VERSION); + [this, request](const Owned<ObjectApprovers>& approvers) { + return deferStateRequest(request, approvers); + })); +} - if (build::GIT_SHA.isSome()) { - writer->field("git_sha", build::GIT_SHA.get()); - } - if (build::GIT_BRANCH.isSome()) { - writer->field("git_branch", build::GIT_BRANCH.get()); - } +Future<Response> Master::Http::deferStateRequest( + const Request& request, + const Owned<ObjectApprovers>& approvers) +{ + bool scheduleBatch = batchedStateRequests.empty(); + + // Add an element to the batched state requests. + Promise<Response> promise; + Future<Response> future = promise.future(); + batchedStateRequests.push_back( + BatchedStateRequest{request, approvers, std::move(promise)}); + + // Schedule processing of batched requests if not yet scheduled. + if (scheduleBatch) { + dispatch(master->self(), [this]() { + processStateRequestsBatch(); + }); + } - if (build::GIT_TAG.isSome()) { - writer->field("git_tag", build::GIT_TAG.get()); - } + return future; +} - writer->field("build_date", build::DATE); - writer->field("build_time", build::TIME); - writer->field("build_user", build::USER); - writer->field("start_time", master->startTime.secs()); - if (master->electedTime.isSome()) { - writer->field("elected_time", master->electedTime->secs()); - } +void Master::Http::processStateRequestsBatch() +{ + CHECK(!batchedStateRequests.empty()) + << "Bug in state batching logic: No requests to process"; + + // This lambda is consumed before the enclosed function returns, + // hence capturing `this` is fine here. + auto produceResponse = [this]( + const Request& request, + const Owned<ObjectApprovers>& approvers) -> Response { + // This lambda is consumed before the outer lambda returns, + // hence capturing a reference is fine here. + auto calculateState = [this, &approvers](JSON::ObjectWriter* writer) { + writer->field("version", MESOS_VERSION); + + if (build::GIT_SHA.isSome()) { + writer->field("git_sha", build::GIT_SHA.get()); + } - writer->field("id", master->info().id()); - writer->field("pid", string(master->self())); - writer->field("hostname", master->info().hostname()); - writer->field("capabilities", master->info().capabilities()); - writer->field("activated_slaves", master->_slaves_active()); - writer->field("deactivated_slaves", master->_slaves_inactive()); - writer->field("unreachable_slaves", master->_slaves_unreachable()); + if (build::GIT_BRANCH.isSome()) { + writer->field("git_branch", build::GIT_BRANCH.get()); + } - if (master->info().has_domain()) { - writer->field("domain", master->info().domain()); - } + if (build::GIT_TAG.isSome()) { + writer->field("git_tag", build::GIT_TAG.get()); + } - // TODO(haosdent): Deprecated this in favor of `leader_info` below. - if (master->leader.isSome()) { - writer->field("leader", master->leader->pid()); - } + writer->field("build_date", build::DATE); + writer->field("build_time", build::TIME); + writer->field("build_user", build::USER); + writer->field("start_time", master->startTime.secs()); - if (master->leader.isSome()) { - writer->field("leader_info", [this](JSON::ObjectWriter* writer) { - json(writer, master->leader.get()); - }); - } + if (master->electedTime.isSome()) { + writer->field("elected_time", master->electedTime->secs()); + } - if (approvers->approved<VIEW_FLAGS>()) { - if (master->flags.cluster.isSome()) { - writer->field("cluster", master->flags.cluster.get()); - } + writer->field("id", master->info().id()); + writer->field("pid", string(master->self())); + writer->field("hostname", master->info().hostname()); + writer->field("capabilities", master->info().capabilities()); + writer->field("activated_slaves", master->_slaves_active()); + writer->field("deactivated_slaves", master->_slaves_inactive()); + writer->field("unreachable_slaves", master->_slaves_unreachable()); - if (master->flags.log_dir.isSome()) { - writer->field("log_dir", master->flags.log_dir.get()); - } + if (master->info().has_domain()) { + writer->field("domain", master->info().domain()); + } - if (master->flags.external_log_file.isSome()) { - writer->field("external_log_file", - master->flags.external_log_file.get()); + // TODO(haosdent): Deprecated this in favor of `leader_info` below. + if (master->leader.isSome()) { + writer->field("leader", master->leader->pid()); + } + + if (master->leader.isSome()) { + writer->field("leader_info", [this](JSON::ObjectWriter* writer) { + json(writer, master->leader.get()); + }); + } + + if (approvers->approved<VIEW_FLAGS>()) { + if (master->flags.cluster.isSome()) { + writer->field("cluster", master->flags.cluster.get()); + } + + if (master->flags.log_dir.isSome()) { + writer->field("log_dir", master->flags.log_dir.get()); + } + + if (master->flags.external_log_file.isSome()) { + writer->field("external_log_file", + master->flags.external_log_file.get()); + } + + writer->field("flags", [this](JSON::ObjectWriter* writer) { + foreachvalue (const flags::Flag& flag, master->flags) { + Option<string> value = flag.stringify(master->flags); + if (value.isSome()) { + writer->field(flag.effective_name().value, value.get()); } + } + }); + } - writer->field("flags", [this](JSON::ObjectWriter* writer) { - foreachvalue (const flags::Flag& flag, master->flags) { - Option<string> value = flag.stringify(master->flags); - if (value.isSome()) { - writer->field(flag.effective_name().value, value.get()); - } - } - }); + // Model all of the registered slaves. + writer->field( + "slaves", + [this, &approvers](JSON::ArrayWriter* writer) { + foreachvalue (Slave* slave, master->slaves.registered) { + writer->element(SlaveWriter(*slave, approvers)); } + }); - // Model all of the registered slaves. - writer->field( - "slaves", - [this, &approvers](JSON::ArrayWriter* writer) { - foreachvalue (Slave* slave, master->slaves.registered) { - writer->element(SlaveWriter(*slave, approvers)); - } - }); + // Model all of the recovered slaves. + writer->field( + "recovered_slaves", + [this](JSON::ArrayWriter* writer) { + foreachvalue ( + const SlaveInfo& slaveInfo, master->slaves.recovered) { + writer->element([&slaveInfo](JSON::ObjectWriter* writer) { + json(writer, slaveInfo); + }); + } + }); - // Model all of the recovered slaves. - writer->field( - "recovered_slaves", - [this](JSON::ArrayWriter* writer) { - foreachvalue ( - const SlaveInfo& slaveInfo, master->slaves.recovered) { - writer->element([&slaveInfo](JSON::ObjectWriter* writer) { - json(writer, slaveInfo); - }); - } - }); + // Model all of the frameworks. + writer->field( + "frameworks", + [this, &approvers](JSON::ArrayWriter* writer) { + foreachvalue ( + Framework* framework, master->frameworks.registered) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + continue; + } - // Model all of the frameworks. - writer->field( - "frameworks", - [this, &approvers](JSON::ArrayWriter* writer) { - foreachvalue ( - Framework* framework, master->frameworks.registered) { - // Skip unauthorized frameworks. - if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { - continue; - } + writer->element(FullFrameworkWriter(approvers, framework)); + } + }); - writer->element(FullFrameworkWriter(approvers, framework)); - } - }); + // Model all of the completed frameworks. + writer->field( + "completed_frameworks", + [this, &approvers](JSON::ArrayWriter* writer) { + foreachvalue ( + const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + continue; + } - // Model all of the completed frameworks. - writer->field( - "completed_frameworks", - [this, &approvers](JSON::ArrayWriter* writer) { - foreachvalue ( - const Owned<Framework>& framework, - master->frameworks.completed) { - // Skip unauthorized frameworks. - if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { - continue; - } + writer->element( + FullFrameworkWriter(approvers, framework.get())); + } + }); - writer->element( - FullFrameworkWriter(approvers, framework.get())); - } - }); + // Orphan tasks are no longer possible. We emit an empty array + // for the sake of backward compatibility. + writer->field("orphan_tasks", [](JSON::ArrayWriter*) {}); - // Orphan tasks are no longer possible. We emit an empty array - // for the sake of backward compatibility. - writer->field("orphan_tasks", [](JSON::ArrayWriter*) {}); + // Unregistered frameworks are no longer possible. We emit an + // empty array for the sake of backward compatibility. + writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {}); + }; - // Unregistered frameworks are no longer possible. We emit an - // empty array for the sake of backward compatibility. - writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {}); - }; + return OK(jsonify(calculateState), request.url.query.get("jsonp")); + }; - return OK(jsonify(state), request.url.query.get("jsonp")); - })); + // Produce the responses in parallel. + // + // TODO(alexr): Consider abstracting this into `parallel_async` or + // `foreach_parallel`, see MESOS-8587. + // + // TODO(alexr): Consider moving `BatchedStateRequest`'s fields into + // `process::async` once it supports moving. + foreach (BatchedStateRequest& request, batchedStateRequests) { + request.promise.associate(process::async( + produceResponse, request.request, request.approvers)); + } + + // Block the master actor until all workers have generated state responses. + // It is crucial not to allow the master actor to continue and possibly + // modify its state while a worker is reading it. + // + // NOTE: There is the potential for deadlock since we are blocking 1 working + // thread here, see MESOS-8256. + vector<Future<Response>> responses; + foreach (const BatchedStateRequest& request, batchedStateRequests) { + responses.push_back(request.promise.future()); + } + process::await(responses).await(); + + batchedStateRequests.clear(); } diff --git a/src/master/master.hpp b/src/master/master.hpp index 209b998..85ef14c 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -44,6 +44,7 @@ #include <mesos/scheduler/scheduler.hpp> +#include <process/future.hpp> #include <process/limiter.hpp> #include <process/http.hpp> #include <process/owned.hpp> @@ -1461,10 +1462,12 @@ private: principal) const; // /master/state + // + // NOTE: Requests to this endpoint are batched. process::Future<process::http::Response> state( const process::http::Request& request, const Option<process::http::authentication::Principal>& - principal) const; + principal); // /master/state-summary process::Future<process::http::Response> stateSummary( @@ -1552,6 +1555,17 @@ private: const Option<process::http::authentication::Principal>& principal) const; + // A continuation for `state()`. Schedules request processing in a batch + // of other '/state' requests. + process::Future<process::http::Response> deferStateRequest( + const process::http::Request& request, + const process::Owned<ObjectApprovers>& approvers); + + // A helper that responds to batched, i.e., accumulated, '/state' + // requests in parallel, i.e., a continuation for `deferStateRequest()`. + // See also `BatchedStateRequest`. + void processStateRequestsBatch(); + process::Future<std::vector<const Task*>> _tasks( const size_t limit, const size_t offset, @@ -1830,6 +1844,17 @@ private: // NOTE: The weights specific pieces of the Operator API are factored // out into this separate class. WeightsHandler weightsHandler; + + // TODO(alexr): Consider adding a `type` or `handler` field to expand + // batching to other heavy read-only requests, e.g., '/state-summary'. + struct BatchedStateRequest + { + process::http::Request request; + process::Owned<ObjectApprovers> approvers; + process::Promise<process::http::Response> promise; + }; + + std::vector<BatchedStateRequest> batchedStateRequests; }; Master(const Master&); // No copying.