This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4aff0d7b583172ae890fbe944b268e222779956d
Author: Benjamin Mahler <bmah...@apache.org>
AuthorDate: Thu Jan 30 15:40:59 2020 -0500

    Improved performance of v1 agent operator API GET_TASKS call.
    
    This follow the same approach used for the master's v1 calls:
    
    https://github.com/apache/mesos/commit/d7dd4d0e8493331d7b7a21b504eb
    https://github.com/apache/mesos/commit/3dda3622f5ed01e8c132dc5ca594
    
    That is, serializing directly to protobuf or json from the in-memory
    v0 state.
    
    Review: https://reviews.apache.org/r/72066
---
 src/slave/http.cpp | 341 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 src/slave/http.hpp |   4 +
 2 files changed, 338 insertions(+), 7 deletions(-)

diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index b120bf8..ab470cf 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -2143,7 +2143,7 @@ Future<Response> Http::getOperations(
 
 Future<Response> Http::getTasks(
     const mesos::agent::Call& call,
-    ContentType acceptType,
+    ContentType contentType,
     const Option<Principal>& principal) const
 {
   CHECK_EQ(mesos::agent::Call::GET_TASKS, call.type());
@@ -2156,19 +2156,346 @@ Future<Response> Http::getTasks(
       {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
     .then(defer(
         slave->self(),
-        [this, acceptType](
+        [this, contentType](
             const Owned<ObjectApprovers>& approvers) -> Response {
-          mesos::agent::Response response;
-          response.set_type(mesos::agent::Response::GET_TASKS);
+          // Serialize the following message:
+          //
+          //   v1::agent::Response response;
+          //   response.set_type(mesos::agent::Response::GET_TASKS);
+          //   *response.mutable_get_tasks() = _...;
 
-          *response.mutable_get_tasks() = _getTasks(approvers);
+          switch (contentType) {
+            case ContentType::PROTOBUF: {
+              string output;
+              google::protobuf::io::StringOutputStream stream(&output);
+              google::protobuf::io::CodedOutputStream writer(&stream);
 
-          return OK(serialize(acceptType, evolve(response)),
-                    stringify(acceptType));
+              WireFormatLite::WriteEnum(
+                  v1::agent::Response::kTypeFieldNumber,
+                  v1::agent::Response::GET_TASKS,
+                  &writer);
+
+              WireFormatLite::WriteBytes(
+                  v1::agent::Response::kGetTasksFieldNumber,
+                  serializeGetTasks(approvers),
+                  &writer);
+
+              // We must manually trim the unused buffer space since
+              // we use the string before the coded output stream is
+              // destructed.
+              writer.Trim();
+
+              return OK(std::move(output), stringify(contentType));
+            }
+
+            case ContentType::JSON: {
+              string body = jsonify([&](JSON::ObjectWriter* writer) {
+                const google::protobuf::Descriptor* descriptor =
+                  v1::agent::Response::descriptor();
+
+                int field;
+
+                field = v1::agent::Response::kTypeFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    v1::agent::Response::Type_Name(
+                        v1::agent::Response::GET_TASKS));
+
+                field = v1::agent::Response::kGetTasksFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    jsonifyGetTasks(approvers));
+              });
+
+              // TODO(bmahler): Pass jsonp query parameter through here.
+              return OK(std::move(body), stringify(contentType));
+            }
+
+            default:
+              return NotAcceptable("Request must accept json or protobuf");
+          }
         }));
 }
 
 
+function<void(JSON::ObjectWriter*)> Http::jsonifyGetTasks(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  return [=](JSON::ObjectWriter* writer) {
+    // Construct framework list with both active and completed frameworks.
+    vector<const Framework*> frameworks;
+    foreachvalue (Framework* f, slave->frameworks) {
+      if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+        frameworks.push_back(f);
+      }
+    }
+    foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
+      if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+        frameworks.push_back(f.get());
+      }
+    }
+
+    // Construct executor list with both active and completed executors.
+    hashmap<const Executor*, const Framework*> executors;
+    foreach (const Framework* f, frameworks) {
+      foreachvalue (const Executor* e, f->executors) {
+        if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
+          executors.put(e, f);
+        }
+      }
+      foreach (const Owned<Executor>& e, f->completedExecutors) {
+        if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
+          executors.put(e.get(), f);
+        }
+      }
+    }
+
+    // Jsonify the following message:
+    //
+    //   v1::agent::Response::GetTasks getTasks;
+    //
+    //   for each pending task:
+    //     *getTasks.add_pending_tasks() = task
+    //   for each queued task:
+    //     *getTasks.add_queued_tasks() = *task;
+    //   for each launched task:
+    //     *getTasks.add_launched_tasks() = *task;
+    //   for each terminated task:
+    //     *getTasks.add_terminated_tasks() = *task;
+    //   for each completed task:
+    //     *getTasks.add_completed_tasks() = *task;
+
+    const google::protobuf::Descriptor* descriptor =
+      v1::agent::Response::GetTasks::descriptor();
+
+    int field;
+
+    // Pending tasks.
+    field = v1::agent::Response::GetTasks::kPendingTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            typedef hashmap<TaskID, TaskInfo> TaskMap;
+            foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) {
+              foreachvalue (const TaskInfo& t, taskInfos) {
+                if (approvers->approved<VIEW_TASK>(t, framework->info)) {
+                  // TODO(bmahler): Consider not constructing the temporary 
task
+                  // object and instead jsonify directly. Since we don't
+                  // expect a large number of pending tasks, we currently don't
+                  // bother with the more efficient approach.
+                  Task task =
+                    protobuf::createTask(t, TASK_STAGING, framework->id());
+
+                  writer->element(asV1Protobuf(task));
+                }
+              }
+            }
+          }
+        });
+
+    // Queued tasks.
+    field = v1::agent::Response::GetTasks::kQueuedTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreachpair (const Executor* executor,
+                       const Framework* framework,
+                       executors) {
+            foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) {
+              if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
+                // TODO(bmahler): Consider not constructing the temporary task
+                // object and instead serialize directly. Since we don't expect
+                // a large number of pending tasks, we currently don't bother
+                // with the more efficient approach.
+                Task t =
+                  protobuf::createTask(taskInfo, TASK_STAGING, 
framework->id());
+
+                writer->element(asV1Protobuf(t));
+              }
+            }
+          }
+        });
+
+    // Launched tasks.
+    field = v1::agent::Response::GetTasks::kLaunchedTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreachpair (const Executor* executor,
+                       const Framework* framework,
+                       executors) {
+            foreachvalue (Task* task, executor->launchedTasks) {
+              if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                writer->element(asV1Protobuf(*task));
+              }
+            }
+          }
+        });
+
+    // Terminated tasks.
+    field = v1::agent::Response::GetTasks::kTerminatedTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreachpair (const Executor* executor,
+                       const Framework* framework,
+                       executors) {
+            foreachvalue (Task* task, executor->terminatedTasks) {
+              if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                writer->element(asV1Protobuf(*task));
+              }
+            }
+          }
+        });
+
+    // Completed tasks.
+    field = v1::agent::Response::GetTasks::kCompletedTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreachpair (const Executor* executor,
+                       const Framework* framework,
+                       executors) {
+            foreach (const std::shared_ptr<Task>& t, executor->completedTasks) 
{
+              if (approvers->approved<VIEW_TASK>(*t.get(), framework->info)) {
+                writer->element(asV1Protobuf(*t));
+              }
+            }
+          }
+        });
+  };
+}
+
+
+string Http::serializeGetTasks(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* f, slave->frameworks) {
+    if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+      frameworks.push_back(f);
+    }
+  }
+  foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
+    if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+      frameworks.push_back(f.get());
+    }
+  }
+
+  // Construct executor list with both active and completed executors.
+  hashmap<const Executor*, const Framework*> executors;
+  foreach (const Framework* f, frameworks) {
+    foreachvalue (Executor* e, f->executors) {
+      if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
+        executors.put(e, f);
+      }
+    }
+    foreach (const Owned<Executor>& e, f->completedExecutors) {
+      if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
+        executors.put(e.get(), f);
+      }
+    }
+  }
+
+  // Serialize the following message:
+  //
+  //   v1::agent::Response::GetTasks getTasks;
+  //
+  //   for each pending task:
+  //     *getTasks.add_pending_tasks() = task
+  //   for each queued task:
+  //     *getTasks.add_queued_tasks() = *task;
+  //   for each launched task:
+  //     *getTasks.add_launched_tasks() = *task;
+  //   for each terminated task:
+  //     *getTasks.add_terminated_tasks() = *task;
+  //   for each completed task:
+  //     *getTasks.add_completed_tasks() = *task;
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  foreach (const Framework* framework, frameworks) {
+    // Pending tasks.
+    typedef hashmap<TaskID, TaskInfo> TaskMap;
+    foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) {
+      foreachvalue (const TaskInfo& taskInfo, taskInfos) {
+        if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
+          // TODO(bmahler): Consider not constructing the temporary task
+          // object and instead serialize directly. Since we don't expect
+          // a large number of pending tasks, we currently don't bother
+          // with the more efficient approach.
+          WireFormatLite2::WriteMessageWithoutCachedSizes(
+              v1::agent::Response::GetTasks::kPendingTasksFieldNumber,
+              protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
+              &writer);
+        }
+      }
+    }
+  }
+
+  foreachpair (const Executor* executor,
+               const Framework* framework,
+               executors) {
+    // Queued tasks.
+    foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) {
+      if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
+        // TODO(bmahler): Consider not constructing the temporary task
+        // object and instead serialize directly. Since we don't expect
+        // a large number of pending tasks, we currently don't bother
+        // with the more efficient approach.
+        WireFormatLite2::WriteMessageWithoutCachedSizes(
+            v1::agent::Response::GetTasks::kQueuedTasksFieldNumber,
+            protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
+            &writer);
+      }
+    }
+
+    // Launched tasks.
+    foreachvalue (Task* task, executor->launchedTasks) {
+      if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        WireFormatLite2::WriteMessageWithoutCachedSizes(
+            v1::agent::Response::GetTasks::kLaunchedTasksFieldNumber,
+            *task,
+            &writer);
+      }
+    }
+
+    // Terminated tasks.
+    foreachvalue (Task* task, executor->terminatedTasks) {
+      if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        WireFormatLite2::WriteMessageWithoutCachedSizes(
+            v1::agent::Response::GetTasks::kTerminatedTasksFieldNumber,
+            *task,
+            &writer);
+      }
+    }
+
+    // Completed tasks.
+    foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
+      if (approvers->approved<VIEW_TASK>(*task.get(), framework->info)) {
+        WireFormatLite2::WriteMessageWithoutCachedSizes(
+            v1::agent::Response::GetTasks::kCompletedTasksFieldNumber,
+            *task,
+            &writer);
+      }
+    }
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
 mesos::agent::Response::GetTasks Http::_getTasks(
     const Owned<ObjectApprovers>& approvers) const
 {
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index c2df2e4..58137cb 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -212,6 +212,10 @@ private:
       ContentType acceptType,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
+      const process::Owned<ObjectApprovers>& approvers) const;
+  std::string serializeGetTasks(
+      const process::Owned<ObjectApprovers>& approvers) const;
   mesos::agent::Response::GetTasks _getTasks(
       const process::Owned<ObjectApprovers>& approvers) const;
 

Reply via email to