This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4aff0d7b583172ae890fbe944b268e222779956d Author: Benjamin Mahler <bmah...@apache.org> AuthorDate: Thu Jan 30 15:40:59 2020 -0500 Improved performance of v1 agent operator API GET_TASKS call. This follow the same approach used for the master's v1 calls: https://github.com/apache/mesos/commit/d7dd4d0e8493331d7b7a21b504eb https://github.com/apache/mesos/commit/3dda3622f5ed01e8c132dc5ca594 That is, serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/72066 --- src/slave/http.cpp | 341 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/slave/http.hpp | 4 + 2 files changed, 338 insertions(+), 7 deletions(-) diff --git a/src/slave/http.cpp b/src/slave/http.cpp index b120bf8..ab470cf 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -2143,7 +2143,7 @@ Future<Response> Http::getOperations( Future<Response> Http::getTasks( const mesos::agent::Call& call, - ContentType acceptType, + ContentType contentType, const Option<Principal>& principal) const { CHECK_EQ(mesos::agent::Call::GET_TASKS, call.type()); @@ -2156,19 +2156,346 @@ Future<Response> Http::getTasks( {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR}) .then(defer( slave->self(), - [this, acceptType]( + [this, contentType]( const Owned<ObjectApprovers>& approvers) -> Response { - mesos::agent::Response response; - response.set_type(mesos::agent::Response::GET_TASKS); + // Serialize the following message: + // + // v1::agent::Response response; + // response.set_type(mesos::agent::Response::GET_TASKS); + // *response.mutable_get_tasks() = _...; - *response.mutable_get_tasks() = _getTasks(approvers); + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); - return OK(serialize(acceptType, evolve(response)), - stringify(acceptType)); + WireFormatLite::WriteEnum( + v1::agent::Response::kTypeFieldNumber, + v1::agent::Response::GET_TASKS, + &writer); + + WireFormatLite::WriteBytes( + v1::agent::Response::kGetTasksFieldNumber, + serializeGetTasks(approvers), + &writer); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } + + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::descriptor(); + + int field; + + field = v1::agent::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::agent::Response::Type_Name( + v1::agent::Response::GET_TASKS)); + + field = v1::agent::Response::kGetTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetTasks(approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } })); } +function<void(JSON::ObjectWriter*)> Http::jsonifyGetTasks( + const Owned<ObjectApprovers>& approvers) const +{ + return [=](JSON::ObjectWriter* writer) { + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* f, slave->frameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f); + } + } + foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f.get()); + } + } + + // Construct executor list with both active and completed executors. + hashmap<const Executor*, const Framework*> executors; + foreach (const Framework* f, frameworks) { + foreachvalue (const Executor* e, f->executors) { + if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) { + executors.put(e, f); + } + } + foreach (const Owned<Executor>& e, f->completedExecutors) { + if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) { + executors.put(e.get(), f); + } + } + } + + // Jsonify the following message: + // + // v1::agent::Response::GetTasks getTasks; + // + // for each pending task: + // *getTasks.add_pending_tasks() = task + // for each queued task: + // *getTasks.add_queued_tasks() = *task; + // for each launched task: + // *getTasks.add_launched_tasks() = *task; + // for each terminated task: + // *getTasks.add_terminated_tasks() = *task; + // for each completed task: + // *getTasks.add_completed_tasks() = *task; + + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::GetTasks::descriptor(); + + int field; + + // Pending tasks. + field = v1::agent::Response::GetTasks::kPendingTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* framework, frameworks) { + typedef hashmap<TaskID, TaskInfo> TaskMap; + foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) { + foreachvalue (const TaskInfo& t, taskInfos) { + if (approvers->approved<VIEW_TASK>(t, framework->info)) { + // TODO(bmahler): Consider not constructing the temporary task + // object and instead jsonify directly. Since we don't + // expect a large number of pending tasks, we currently don't + // bother with the more efficient approach. + Task task = + protobuf::createTask(t, TASK_STAGING, framework->id()); + + writer->element(asV1Protobuf(task)); + } + } + } + } + }); + + // Queued tasks. + field = v1::agent::Response::GetTasks::kQueuedTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachpair (const Executor* executor, + const Framework* framework, + executors) { + foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) { + if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) { + // TODO(bmahler): Consider not constructing the temporary task + // object and instead serialize directly. Since we don't expect + // a large number of pending tasks, we currently don't bother + // with the more efficient approach. + Task t = + protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + + writer->element(asV1Protobuf(t)); + } + } + } + }); + + // Launched tasks. + field = v1::agent::Response::GetTasks::kLaunchedTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachpair (const Executor* executor, + const Framework* framework, + executors) { + foreachvalue (Task* task, executor->launchedTasks) { + if (approvers->approved<VIEW_TASK>(*task, framework->info)) { + writer->element(asV1Protobuf(*task)); + } + } + } + }); + + // Terminated tasks. + field = v1::agent::Response::GetTasks::kTerminatedTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachpair (const Executor* executor, + const Framework* framework, + executors) { + foreachvalue (Task* task, executor->terminatedTasks) { + if (approvers->approved<VIEW_TASK>(*task, framework->info)) { + writer->element(asV1Protobuf(*task)); + } + } + } + }); + + // Completed tasks. + field = v1::agent::Response::GetTasks::kCompletedTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachpair (const Executor* executor, + const Framework* framework, + executors) { + foreach (const std::shared_ptr<Task>& t, executor->completedTasks) { + if (approvers->approved<VIEW_TASK>(*t.get(), framework->info)) { + writer->element(asV1Protobuf(*t)); + } + } + } + }); + }; +} + + +string Http::serializeGetTasks( + const Owned<ObjectApprovers>& approvers) const +{ + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* f, slave->frameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f); + } + } + foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f.get()); + } + } + + // Construct executor list with both active and completed executors. + hashmap<const Executor*, const Framework*> executors; + foreach (const Framework* f, frameworks) { + foreachvalue (Executor* e, f->executors) { + if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) { + executors.put(e, f); + } + } + foreach (const Owned<Executor>& e, f->completedExecutors) { + if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) { + executors.put(e.get(), f); + } + } + } + + // Serialize the following message: + // + // v1::agent::Response::GetTasks getTasks; + // + // for each pending task: + // *getTasks.add_pending_tasks() = task + // for each queued task: + // *getTasks.add_queued_tasks() = *task; + // for each launched task: + // *getTasks.add_launched_tasks() = *task; + // for each terminated task: + // *getTasks.add_terminated_tasks() = *task; + // for each completed task: + // *getTasks.add_completed_tasks() = *task; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + foreach (const Framework* framework, frameworks) { + // Pending tasks. + typedef hashmap<TaskID, TaskInfo> TaskMap; + foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) { + foreachvalue (const TaskInfo& taskInfo, taskInfos) { + if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) { + // TODO(bmahler): Consider not constructing the temporary task + // object and instead serialize directly. Since we don't expect + // a large number of pending tasks, we currently don't bother + // with the more efficient approach. + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetTasks::kPendingTasksFieldNumber, + protobuf::createTask(taskInfo, TASK_STAGING, framework->id()), + &writer); + } + } + } + } + + foreachpair (const Executor* executor, + const Framework* framework, + executors) { + // Queued tasks. + foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) { + if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) { + // TODO(bmahler): Consider not constructing the temporary task + // object and instead serialize directly. Since we don't expect + // a large number of pending tasks, we currently don't bother + // with the more efficient approach. + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetTasks::kQueuedTasksFieldNumber, + protobuf::createTask(taskInfo, TASK_STAGING, framework->id()), + &writer); + } + } + + // Launched tasks. + foreachvalue (Task* task, executor->launchedTasks) { + if (approvers->approved<VIEW_TASK>(*task, framework->info)) { + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetTasks::kLaunchedTasksFieldNumber, + *task, + &writer); + } + } + + // Terminated tasks. + foreachvalue (Task* task, executor->terminatedTasks) { + if (approvers->approved<VIEW_TASK>(*task, framework->info)) { + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetTasks::kTerminatedTasksFieldNumber, + *task, + &writer); + } + } + + // Completed tasks. + foreach (const std::shared_ptr<Task>& task, executor->completedTasks) { + if (approvers->approved<VIEW_TASK>(*task.get(), framework->info)) { + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetTasks::kCompletedTasksFieldNumber, + *task, + &writer); + } + } + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; +} + + mesos::agent::Response::GetTasks Http::_getTasks( const Owned<ObjectApprovers>& approvers) const { diff --git a/src/slave/http.hpp b/src/slave/http.hpp index c2df2e4..58137cb 100644 --- a/src/slave/http.hpp +++ b/src/slave/http.hpp @@ -212,6 +212,10 @@ private: ContentType acceptType, const Option<process::http::authentication::Principal>& principal) const; + std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks( + const process::Owned<ObjectApprovers>& approvers) const; + std::string serializeGetTasks( + const process::Owned<ObjectApprovers>& approvers) const; mesos::agent::Response::GetTasks _getTasks( const process::Owned<ObjectApprovers>& approvers) const;