Repository: mesos
Updated Branches:
  refs/heads/master d840f48f9 -> d66f0b1e6


Implemented 'Subscribed' event for v1 master event stream.

This change adds logic for sending a `Subscribed` event containing
the present master state when a client subscribes to the event stream.

Review: https://reviews.apache.org/r/49518/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d66f0b1e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d66f0b1e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d66f0b1e

Branch: refs/heads/master
Commit: d66f0b1e67f1fea50265d5026a1d89f7dcf85a7d
Parents: 3038809
Author: Zhitao Li <zhitaoli...@gmail.com>
Authored: Tue Jul 5 21:26:00 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    | 17 +++++--
 include/mesos/v1/master/master.proto | 17 +++++--
 src/master/http.cpp                  | 39 +++++++++-----
 src/tests/api_tests.cpp              | 84 ++++++++++++++++++-------------
 4 files changed, 102 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto 
b/include/mesos/master/master.proto
index f0c8a56..6aedc2f 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -435,12 +435,20 @@ message Response {
 message Event {
   enum Type {
     UNKNOWN = 0;
-    TASK_ADDED = 1; // See `TaskAdded` below.
-    TASK_UPDATED = 2; // See `TaskUpdated` below.
+    SUBSCRIBED = 1; // See `Subscribed` below.
+    TASK_ADDED = 2; // See `TaskAdded` below.
+    TASK_UPDATED = 3; // See `TaskUpdated` below.
 
     // TODO(vinod): Fill in more events.
   }
 
+  // First event received when a client subscribes.
+  message Subscribed {
+    // Snapshot of the entire cluster state. Further updates to the
+    // cluster state are sent as separate events on the stream.
+    optional Response.GetState get_state = 1;
+  }
+
   // Forwarded by the master when a task becomes known to it. This can happen
   // when a new task is launched by the scheduler or when the task becomes
   // known to the master upon an agent (re-)registration after a failover.
@@ -459,6 +467,7 @@ message Event {
 
   optional Type type = 1;
 
-  optional TaskAdded task_added = 2;
-  optional TaskUpdated task_updated = 3;
+  optional Subscribed subscribed = 2;
+  optional TaskAdded task_added = 3;
+  optional TaskUpdated task_updated = 4;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto 
b/include/mesos/v1/master/master.proto
index 0c8cf15..19dbd1a 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -436,12 +436,20 @@ message Response {
 message Event {
   enum Type {
     UNKNOWN = 0;
-    TASK_ADDED = 1; // See `TaskAdded` below.
-    TASK_UPDATED = 2; // See `TaskUpdated` below.
+    SUBSCRIBED = 1; // See `Subscribed` below.
+    TASK_ADDED = 2; // See `TaskAdded` below.
+    TASK_UPDATED = 3; // See `TaskUpdated` below.
 
     // TODO(vinod): Fill in more events.
   }
 
+  // First event received when a client subscribes.
+  message Subscribed {
+    // Snapshot of the entire cluster state. Further updates to the
+    // cluster state are sent as separate events on the stream.
+    optional Response.GetState get_state = 1;
+  }
+
   // Forwarded by the master when a task becomes known to it. This can happen
   // when a new task is launched by the scheduler or when the task becomes
   // known to the master upon an agent (re-)registration after a failover.
@@ -460,6 +468,7 @@ message Event {
 
   optional Type type = 1;
 
-  optional TaskAdded task_added = 2;
-  optional TaskUpdated task_updated = 3;
+  optional Subscribed subscribed = 2;
+  optional TaskAdded task_added = 3;
+  optional TaskUpdated task_updated = 4;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 10b0572..3640486 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -589,18 +589,33 @@ Future<Response> Master::Http::api(
       return quotaHandler.remove(call, principal);
 
     case mesos::master::Call::SUBSCRIBE: {
-      Pipe pipe;
-      OK ok;
-
-      ok.headers["Content-Type"] = stringify(acceptType);
-      ok.type = Response::PIPE;
-      ok.reader = pipe.reader();
-
-      HttpConnection http {pipe.writer(), acceptType, UUID::random()};
-
-      master->subscribe(http);
-
-      return ok;
+      return _getState(principal)
+        .then(defer(master->self(),
+            [this, acceptType]
+            (const mesos::master::Response::GetState& getState)
+              -> Future<Response> {
+          // TODO(zhitao): There is a possible race condition here: if an 
action
+          // like `taskUpdate()` is queued between `_getState()` and this
+          // continuation, neither the event will be sent to the subscriber
+          // (because the connection is not in subscribers yet), nor
+          // the effect of the change would be captured in the snapshot.
+          Pipe pipe;
+          OK ok;
+
+          ok.headers["Content-Type"] = stringify(acceptType);
+          ok.type = Response::PIPE;
+          ok.reader = pipe.reader();
+
+          HttpConnection http {pipe.writer(), acceptType, UUID::random()};
+          master->subscribe(http);
+
+          mesos::master::Event event;
+          event.set_type(mesos::master::Event::SUBSCRIBED);
+          event.mutable_subscribed()->mutable_get_state()->CopyFrom(getState);
+          http.send<mesos::master::Event, v1::master::Event>(event);
+
+          return ok;
+        }));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 393afcf..7cf716d 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -1328,44 +1328,11 @@ TEST_P(MasterAPITest, StartAndStopMaintenance)
 // endpoint is able to receive `TASK_ADDED`/`TASK_UPDATED` events.
 TEST_P(MasterAPITest, Subscribe)
 {
-  Try<Owned<cluster::Master>> master = this->StartMaster();
-  ASSERT_SOME(master);
-
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
   ContentType contentType = GetParam();
-  headers["Accept"] = stringify(contentType);
-
-  Future<Response> response = process::http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(Response::PIPE, response.get().type);
-  ASSERT_SOME(response->reader);
-
-  Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
 
-  Future<Result<v1::master::Event>> event = decoder.read();
-
-  EXPECT_TRUE(event.isPending());
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
 
-  // Launch a task using the scheduler. This should result in a `TASK_ADDED`
-  // event when the task is launched followed by a `TASK_UPDATED` event after
-  // the task transitions to running state.
   auto scheduler = std::make_shared<MockV1HTTPScheduler>();
   auto executor = std::make_shared<MockV1HTTPExecutor>();
 
@@ -1407,11 +1374,58 @@ TEST_P(MasterAPITest, Subscribe)
 
   AWAIT_READY(subscribed);
 
+  // Launch a task using the scheduler. This should result in a `TASK_ADDED`
+  // event when the task is launched followed by a `TASK_UPDATED` event after
+  // the task transitions to running state.
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
   AWAIT_READY(offers);
   EXPECT_NE(0, offers->offers().size());
 
+  // Create event stream after seeing first offer but before first task is
+  // launched. We should see one framework, one agent and zero task/executor.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  headers["Accept"] = stringify(contentType);
+
+  Future<Response> response = process::http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(Response::PIPE, response.get().type);
+  ASSERT_SOME(response->reader);
+
+  Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  Future<Result<v1::master::Event>> event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event.get().get().type());
+  const v1::master::Response::GetState& getState =
+      event.get().get().subscribed().get_state();
+
+  EXPECT_EQ(1u, getState.get_frameworks().frameworks_size());
+  EXPECT_EQ(1u, getState.get_agents().agents_size());
+  EXPECT_EQ(0u, getState.get_tasks().tasks_size());
+  EXPECT_EQ(0u, getState.get_executors().executors_size());
+
+  event = decoder.read();
+  EXPECT_TRUE(event.isPending());
+
   const v1::Offer& offer = offers->offers(0);
 
   TaskInfo task = createTask(internal::devolve(offer), "", executorId);

Reply via email to