Repository: mesos Updated Branches: refs/heads/master d840f48f9 -> d66f0b1e6
Implemented 'Subscribed' event for v1 master event stream. This change adds logic for sending a `Subscribed` event containing the present master state when a client subscribes to the event stream. Review: https://reviews.apache.org/r/49518/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d66f0b1e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d66f0b1e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d66f0b1e Branch: refs/heads/master Commit: d66f0b1e67f1fea50265d5026a1d89f7dcf85a7d Parents: 3038809 Author: Zhitao Li <zhitaoli...@gmail.com> Authored: Tue Jul 5 21:26:00 2016 -0700 Committer: Anand Mazumdar <an...@apache.org> Committed: Tue Jul 5 22:32:57 2016 -0700 ---------------------------------------------------------------------- include/mesos/master/master.proto | 17 +++++-- include/mesos/v1/master/master.proto | 17 +++++-- src/master/http.cpp | 39 +++++++++----- src/tests/api_tests.cpp | 84 ++++++++++++++++++------------- 4 files changed, 102 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/master/master.proto ---------------------------------------------------------------------- diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto index f0c8a56..6aedc2f 100644 --- a/include/mesos/master/master.proto +++ b/include/mesos/master/master.proto @@ -435,12 +435,20 @@ message Response { message Event { enum Type { UNKNOWN = 0; - TASK_ADDED = 1; // See `TaskAdded` below. - TASK_UPDATED = 2; // See `TaskUpdated` below. + SUBSCRIBED = 1; // See `Subscribed` below. + TASK_ADDED = 2; // See `TaskAdded` below. + TASK_UPDATED = 3; // See `TaskUpdated` below. // TODO(vinod): Fill in more events. } + // First event received when a client subscribes. + message Subscribed { + // Snapshot of the entire cluster state. Further updates to the + // cluster state are sent as separate events on the stream. + optional Response.GetState get_state = 1; + } + // Forwarded by the master when a task becomes known to it. This can happen // when a new task is launched by the scheduler or when the task becomes // known to the master upon an agent (re-)registration after a failover. @@ -459,6 +467,7 @@ message Event { optional Type type = 1; - optional TaskAdded task_added = 2; - optional TaskUpdated task_updated = 3; + optional Subscribed subscribed = 2; + optional TaskAdded task_added = 3; + optional TaskUpdated task_updated = 4; } http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/v1/master/master.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto index 0c8cf15..19dbd1a 100644 --- a/include/mesos/v1/master/master.proto +++ b/include/mesos/v1/master/master.proto @@ -436,12 +436,20 @@ message Response { message Event { enum Type { UNKNOWN = 0; - TASK_ADDED = 1; // See `TaskAdded` below. - TASK_UPDATED = 2; // See `TaskUpdated` below. + SUBSCRIBED = 1; // See `Subscribed` below. + TASK_ADDED = 2; // See `TaskAdded` below. + TASK_UPDATED = 3; // See `TaskUpdated` below. // TODO(vinod): Fill in more events. } + // First event received when a client subscribes. + message Subscribed { + // Snapshot of the entire cluster state. Further updates to the + // cluster state are sent as separate events on the stream. + optional Response.GetState get_state = 1; + } + // Forwarded by the master when a task becomes known to it. This can happen // when a new task is launched by the scheduler or when the task becomes // known to the master upon an agent (re-)registration after a failover. @@ -460,6 +468,7 @@ message Event { optional Type type = 1; - optional TaskAdded task_added = 2; - optional TaskUpdated task_updated = 3; + optional Subscribed subscribed = 2; + optional TaskAdded task_added = 3; + optional TaskUpdated task_updated = 4; } http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 10b0572..3640486 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -589,18 +589,33 @@ Future<Response> Master::Http::api( return quotaHandler.remove(call, principal); case mesos::master::Call::SUBSCRIBE: { - Pipe pipe; - OK ok; - - ok.headers["Content-Type"] = stringify(acceptType); - ok.type = Response::PIPE; - ok.reader = pipe.reader(); - - HttpConnection http {pipe.writer(), acceptType, UUID::random()}; - - master->subscribe(http); - - return ok; + return _getState(principal) + .then(defer(master->self(), + [this, acceptType] + (const mesos::master::Response::GetState& getState) + -> Future<Response> { + // TODO(zhitao): There is a possible race condition here: if an action + // like `taskUpdate()` is queued between `_getState()` and this + // continuation, neither the event will be sent to the subscriber + // (because the connection is not in subscribers yet), nor + // the effect of the change would be captured in the snapshot. + Pipe pipe; + OK ok; + + ok.headers["Content-Type"] = stringify(acceptType); + ok.type = Response::PIPE; + ok.reader = pipe.reader(); + + HttpConnection http {pipe.writer(), acceptType, UUID::random()}; + master->subscribe(http); + + mesos::master::Event event; + event.set_type(mesos::master::Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_get_state()->CopyFrom(getState); + http.send<mesos::master::Event, v1::master::Event>(event); + + return ok; + })); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/tests/api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index 393afcf..7cf716d 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -1328,44 +1328,11 @@ TEST_P(MasterAPITest, StartAndStopMaintenance) // endpoint is able to receive `TASK_ADDED`/`TASK_UPDATED` events. TEST_P(MasterAPITest, Subscribe) { - Try<Owned<cluster::Master>> master = this->StartMaster(); - ASSERT_SOME(master); - - v1::master::Call v1Call; - v1Call.set_type(v1::master::Call::SUBSCRIBE); - - process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); - ContentType contentType = GetParam(); - headers["Accept"] = stringify(contentType); - - Future<Response> response = process::http::streaming::post( - master.get()->pid, - "api/v1", - headers, - serialize(contentType, v1Call), - stringify(contentType)); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response); - ASSERT_EQ(Response::PIPE, response.get().type); - ASSERT_SOME(response->reader); - - Pipe::Reader reader = response->reader.get(); - - auto deserializer = - lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); - - Reader<v1::master::Event> decoder( - Decoder<v1::master::Event>(deserializer), reader); - Future<Result<v1::master::Event>> event = decoder.read(); - - EXPECT_TRUE(event.isPending()); + Try<Owned<cluster::Master>> master = this->StartMaster(); + ASSERT_SOME(master); - // Launch a task using the scheduler. This should result in a `TASK_ADDED` - // event when the task is launched followed by a `TASK_UPDATED` event after - // the task transitions to running state. auto scheduler = std::make_shared<MockV1HTTPScheduler>(); auto executor = std::make_shared<MockV1HTTPExecutor>(); @@ -1407,11 +1374,58 @@ TEST_P(MasterAPITest, Subscribe) AWAIT_READY(subscribed); + // Launch a task using the scheduler. This should result in a `TASK_ADDED` + // event when the task is launched followed by a `TASK_UPDATED` event after + // the task transitions to running state. v1::FrameworkID frameworkId(subscribed->framework_id()); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); + // Create event stream after seeing first offer but before first task is + // launched. We should see one framework, one agent and zero task/executor. + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::SUBSCRIBE); + + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + + headers["Accept"] = stringify(contentType); + + Future<Response> response = process::http::streaming::post( + master.get()->pid, + "api/v1", + headers, + serialize(contentType, v1Call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response); + ASSERT_EQ(Response::PIPE, response.get().type); + ASSERT_SOME(response->reader); + + Pipe::Reader reader = response->reader.get(); + + auto deserializer = + lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); + + Reader<v1::master::Event> decoder( + Decoder<v1::master::Event>(deserializer), reader); + + Future<Result<v1::master::Event>> event = decoder.read(); + AWAIT_READY(event); + + EXPECT_EQ(v1::master::Event::SUBSCRIBED, event.get().get().type()); + const v1::master::Response::GetState& getState = + event.get().get().subscribed().get_state(); + + EXPECT_EQ(1u, getState.get_frameworks().frameworks_size()); + EXPECT_EQ(1u, getState.get_agents().agents_size()); + EXPECT_EQ(0u, getState.get_tasks().tasks_size()); + EXPECT_EQ(0u, getState.get_executors().executors_size()); + + event = decoder.read(); + EXPECT_TRUE(event.isPending()); + const v1::Offer& offer = offers->offers(0); TaskInfo task = createTask(internal::devolve(offer), "", executorId);