Repository: mesos Updated Branches: refs/heads/master 0760b007a -> 14c605e8c
MESOS-1392: MasterDetector now returns a None when it cannot read the content of the ZNode it has detected. Review: https://reviews.apache.org/r/25663 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14c605e8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14c605e8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14c605e8 Branch: refs/heads/master Commit: 14c605e8ce425ec8c517d8e4f899eb3ddeede56a Parents: 0760b00 Author: Jiang Yan Xu <y...@jxu.me> Authored: Mon Sep 15 14:20:52 2014 -0700 Committer: Jiang Yan Xu <y...@jxu.me> Committed: Wed Sep 17 17:30:05 2014 -0700 ---------------------------------------------------------------------- src/log/network.hpp | 23 +++++++++++++-------- src/master/detector.cpp | 15 ++++++++++---- src/tests/group_tests.cpp | 47 ++++++++++++++++++++++++++++++++++-------- src/zookeeper/group.cpp | 19 ++++++++++------- src/zookeeper/group.hpp | 11 ++++++---- 5 files changed, 81 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/log/network.hpp ---------------------------------------------------------------------- diff --git a/src/log/network.hpp b/src/log/network.hpp index fc85a57..7d6b592 100644 --- a/src/log/network.hpp +++ b/src/log/network.hpp @@ -121,8 +121,8 @@ private: // Helper for handling time outs when collecting membership // data. For now, a timeout is treated as a failure. - static process::Future<std::list<std::string> > timedout( - process::Future<std::list<std::string> > datas) + static process::Future<std::list<Option<std::string> > > timedout( + process::Future<std::list<Option<std::string> > > datas) { datas.discard(); return process::Failure("Timed out"); @@ -139,7 +139,8 @@ private: void watched(const process::Future<std::set<zookeeper::Group::Membership> >&); // Invoked when group members data has been collected. - void collected(const process::Future<std::list<std::string> >& datas); + void collected( + const process::Future<std::list<Option<std::string> > >& datas); zookeeper::Group group; process::Future<std::set<zookeeper::Group::Membership> > memberships; @@ -423,7 +424,7 @@ inline void ZooKeeperNetwork::watched( LOG(INFO) << "ZooKeeper group memberships changed"; // Get data for each membership in order to convert them to PIDs. - std::list<process::Future<std::string> > futures; + std::list<process::Future<Option<std::string> > > futures; foreach (const zookeeper::Group::Membership& membership, memberships.get()) { futures.push_back(group.data(membership)); @@ -436,7 +437,7 @@ inline void ZooKeeperNetwork::watched( inline void ZooKeeperNetwork::collected( - const process::Future<std::list<std::string> >& datas) + const process::Future<std::list<Option<std::string> > >& datas) { if (datas.isFailed()) { LOG(WARNING) << "Failed to get data for ZooKeeper group members: " @@ -452,10 +453,14 @@ inline void ZooKeeperNetwork::collected( std::set<process::UPID> pids; - foreach (const std::string& data, datas.get()) { - process::UPID pid(data); - CHECK(pid) << "Failed to parse '" << data << "'"; - pids.insert(pid); + foreach (const Option<std::string>& data, datas.get()) { + // Data could be None if the membership is gone before its + // content can be read. + if (data.isSome()) { + process::UPID pid(data.get()); + CHECK(pid) << "Failed to parse '" << data.get() << "'"; + pids.insert(pid); + } } LOG(INFO) << "ZooKeeper group PIDs: " << stringify(pids); http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/master/detector.cpp ---------------------------------------------------------------------- diff --git a/src/master/detector.cpp b/src/master/detector.cpp index 6436b8e..700eb9d 100644 --- a/src/master/detector.cpp +++ b/src/master/detector.cpp @@ -180,7 +180,9 @@ private: void detected(const Future<Option<Group::Membership> >& leader); // Invoked when we have fetched the data associated with the leader. - void fetched(const Group::Membership& membership, const Future<string>& data); + void fetched( + const Group::Membership& membership, + const Future<Option<string> >& data); Owned<Group> group; LeaderDetector detector; @@ -388,7 +390,7 @@ void ZooKeeperMasterDetectorProcess::detected( void ZooKeeperMasterDetectorProcess::fetched( const Group::Membership& membership, - const Future<string>& data) + const Future<Option<string> >& data) { CHECK(!data.isDiscarded()); @@ -396,6 +398,11 @@ void ZooKeeperMasterDetectorProcess::fetched( leader = None(); promises::fail(&promises, data.failure()); return; + } else if (data.get().isNone()) { + // Membership is gone before we can read its data. + leader = None(); + promises::set(&promises, leader); + return; } // Parse the data based on the membership label and cache the @@ -404,12 +411,12 @@ void ZooKeeperMasterDetectorProcess::fetched( if (label.isNone()) { // If we are here it means some masters are still creating znodes // with the old format. - UPID pid = UPID(data.get()); + UPID pid = UPID(data.get().get()); LOG(WARNING) << "Leading master " << pid << " has data in old format"; leader = protobuf::createMasterInfo(pid); } else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) { MasterInfo info; - if (!info.ParseFromString(data.get())) { + if (!info.ParseFromString(data.get().get())) { leader = None(); promises::fail(&promises, "Failed to parse data into MasterInfo"); return; http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/tests/group_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp index 7ed9895..144ba82 100644 --- a/src/tests/group_tests.cpp +++ b/src/tests/group_tests.cpp @@ -41,6 +41,8 @@ using zookeeper::GroupProcess; using process::Future; +using std::string; + using testing::_; class GroupTest : public ZooKeeperTest {}; @@ -60,9 +62,10 @@ TEST_F(GroupTest, Group) EXPECT_EQ(1u, memberships.get().size()); EXPECT_EQ(1u, memberships.get().count(membership.get())); - Future<std::string> data = group.data(membership.get()); + Future<Option<string> > data = group.data(membership.get()); - AWAIT_EXPECT_EQ("hello world", data); + AWAIT_READY(data); + EXPECT_SOME_EQ("hello world", data.get()); Future<bool> cancellation = group.cancel(membership.get()); @@ -116,13 +119,37 @@ TEST_F(GroupTest, GroupDataWithDisconnect) server->shutdownNetwork(); - Future<std::string> data = group.data(membership.get()); + Future<Option<string> > data = group.data(membership.get()); EXPECT_TRUE(data.isPending()); server->startNetwork(); - AWAIT_EXPECT_EQ("hello world", data); + AWAIT_READY(data); + EXPECT_SOME_EQ("hello world", data.get()); +} + + +TEST_F(GroupTest, GroupDataWithRemovedMembership) +{ + Group group(server->connectString(), NO_TIMEOUT, "/test/"); + + Future<Group::Membership> membership = group.join("hello world"); + + AWAIT_READY(membership); + + Future<std::set<Group::Membership> > memberships = group.watch(); + + AWAIT_READY(memberships); + EXPECT_EQ(1u, memberships.get().size()); + EXPECT_EQ(1u, memberships.get().count(membership.get())); + + AWAIT_EXPECT_EQ(true, group.cancel(membership.get())); + + Future<Option<string> > data = group.data(membership.get()); + + AWAIT_READY(data); + EXPECT_NONE(data.get()); } @@ -140,9 +167,10 @@ TEST_F(GroupTest, GroupCancelWithDisconnect) EXPECT_EQ(1u, memberships.get().size()); EXPECT_EQ(1u, memberships.get().count(membership.get())); - Future<std::string> data = group.data(membership.get()); + Future<Option<string> > data = group.data(membership.get()); - AWAIT_EXPECT_EQ("hello world", data); + AWAIT_READY(data); + EXPECT_SOME_EQ("hello world", data.get()); server->shutdownNetwork(); @@ -378,7 +406,7 @@ TEST_F(GroupTest, LabelledGroup) // Join a group with label. Future<Group::Membership> membership = group.join( - "hello world", std::string("testlabel")); + "hello world", string("testlabel")); AWAIT_READY(membership); @@ -388,9 +416,10 @@ TEST_F(GroupTest, LabelledGroup) EXPECT_EQ(1u, memberships.get().size()); EXPECT_EQ(1u, memberships.get().count(membership.get())); - Future<std::string> data = group.data(membership.get()); + Future<Option<string> > data = group.data(membership.get()); - AWAIT_EXPECT_EQ("hello world", data); + AWAIT_READY(data); + EXPECT_SOME_EQ("hello world", data.get()); Future<bool> cancellation = group.cancel(membership.get()); http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/zookeeper/group.cpp ---------------------------------------------------------------------- diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp index 58491c0..7dee0a1 100644 --- a/src/zookeeper/group.cpp +++ b/src/zookeeper/group.cpp @@ -210,7 +210,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership) } -Future<string> GroupProcess::data(const Group::Membership& membership) +Future<Option<string> > GroupProcess::data(const Group::Membership& membership) { if (error.isSome()) { return Failure(error.get()); @@ -224,7 +224,7 @@ Future<string> GroupProcess::data(const Group::Membership& membership) // client can assume a happens-before ordering of operations (i.e., // the first request will happen before the second, etc). - Result<string> result = doData(membership); + Result<Option<string> > result = doData(membership); if (result.isNone()) { // Try again later. Data* data = new Data(membership); @@ -649,7 +649,8 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership) } -Result<string> GroupProcess::doData(const Group::Membership& membership) +Result<Option<string> > GroupProcess::doData( + const Group::Membership& membership) { CHECK_EQ(state, READY); @@ -662,16 +663,18 @@ Result<string> GroupProcess::doData(const Group::Membership& membership) int code = zk->get(path, false, &result, NULL); - if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) { + if (code == ZNONODE) { + return Option<string>::none(); + } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) { CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE); - return None(); + return None(); // Try again later. } else if (code != ZOK) { return Error( "Failed to get data for ephemeral node '" + path + "' in ZooKeeper: " + zk->message(code)); } - return result; + return Some(result); } @@ -844,7 +847,7 @@ Try<bool> GroupProcess::sync() while (!pending.datas.empty()) { Data* data = pending.datas.front(); // TODO(benh): Ignore if future has been discarded? - Result<string> result = doData(data->membership); + Result<Option<string> > result = doData(data->membership); if (result.isNone()) { return false; // Try again later. } else if (result.isError()) { @@ -992,7 +995,7 @@ Future<bool> Group::cancel(const Group::Membership& membership) } -Future<string> Group::data(const Group::Membership& membership) +Future<Option<string> > Group::data(const Group::Membership& membership) { return dispatch(process, &GroupProcess::data, membership); } http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/zookeeper/group.hpp ---------------------------------------------------------------------- diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp index 16f9b7b..dc600b1 100644 --- a/src/zookeeper/group.hpp +++ b/src/zookeeper/group.hpp @@ -131,7 +131,9 @@ public: // Returns the result of trying to fetch the data associated with a // group membership. - process::Future<std::string> data(const Membership& membership); + // A None is returned if the specified membership doesn't exist, + // e.g., it can be removed before this call can read it content. + process::Future<Option<std::string> > data(const Membership& membership); // Returns a future that gets set when the group memberships differ // from the "expected" memberships specified. @@ -173,7 +175,8 @@ public: const std::string& data, const Option<std::string>& label); process::Future<bool> cancel(const Group::Membership& membership); - process::Future<std::string> data(const Group::Membership& membership); + process::Future<Option<std::string> > data( + const Group::Membership& membership); process::Future<std::set<Group::Membership> > watch( const std::set<Group::Membership>& expected); process::Future<Option<int64_t> > session(); @@ -192,7 +195,7 @@ private: const std::string& data, const Option<std::string>& label); Result<bool> doCancel(const Group::Membership& membership); - Result<std::string> doData(const Group::Membership& membership); + Result<Option<std::string> > doData(const Group::Membership& membership); // Returns true if authentication is successful, false if the // failure is retryable and Error otherwise. @@ -287,7 +290,7 @@ private: explicit Data(const Group::Membership& _membership) : membership(_membership) {} Group::Membership membership; - process::Promise<std::string> promise; + process::Promise<Option<std::string> > promise; }; struct Watch