Repository: mesos
Updated Branches:
  refs/heads/master 0760b007a -> 14c605e8c


MESOS-1392: MasterDetector now returns a None when it cannot read the content 
of the ZNode it has detected.

Review: https://reviews.apache.org/r/25663


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14c605e8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14c605e8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14c605e8

Branch: refs/heads/master
Commit: 14c605e8ce425ec8c517d8e4f899eb3ddeede56a
Parents: 0760b00
Author: Jiang Yan Xu <y...@jxu.me>
Authored: Mon Sep 15 14:20:52 2014 -0700
Committer: Jiang Yan Xu <y...@jxu.me>
Committed: Wed Sep 17 17:30:05 2014 -0700

----------------------------------------------------------------------
 src/log/network.hpp       | 23 +++++++++++++--------
 src/master/detector.cpp   | 15 ++++++++++----
 src/tests/group_tests.cpp | 47 ++++++++++++++++++++++++++++++++++--------
 src/zookeeper/group.cpp   | 19 ++++++++++-------
 src/zookeeper/group.hpp   | 11 ++++++----
 5 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index fc85a57..7d6b592 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -121,8 +121,8 @@ private:
 
   // Helper for handling time outs when collecting membership
   // data. For now, a timeout is treated as a failure.
-  static process::Future<std::list<std::string> > timedout(
-      process::Future<std::list<std::string> > datas)
+  static process::Future<std::list<Option<std::string> > > timedout(
+      process::Future<std::list<Option<std::string> > > datas)
   {
     datas.discard();
     return process::Failure("Timed out");
@@ -139,7 +139,8 @@ private:
   void watched(const process::Future<std::set<zookeeper::Group::Membership> 
>&);
 
   // Invoked when group members data has been collected.
-  void collected(const process::Future<std::list<std::string> >& datas);
+  void collected(
+      const process::Future<std::list<Option<std::string> > >& datas);
 
   zookeeper::Group group;
   process::Future<std::set<zookeeper::Group::Membership> > memberships;
@@ -423,7 +424,7 @@ inline void ZooKeeperNetwork::watched(
   LOG(INFO) << "ZooKeeper group memberships changed";
 
   // Get data for each membership in order to convert them to PIDs.
-  std::list<process::Future<std::string> > futures;
+  std::list<process::Future<Option<std::string> > > futures;
 
   foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
     futures.push_back(group.data(membership));
@@ -436,7 +437,7 @@ inline void ZooKeeperNetwork::watched(
 
 
 inline void ZooKeeperNetwork::collected(
-    const process::Future<std::list<std::string> >& datas)
+    const process::Future<std::list<Option<std::string> > >& datas)
 {
   if (datas.isFailed()) {
     LOG(WARNING) << "Failed to get data for ZooKeeper group members: "
@@ -452,10 +453,14 @@ inline void ZooKeeperNetwork::collected(
 
   std::set<process::UPID> pids;
 
-  foreach (const std::string& data, datas.get()) {
-    process::UPID pid(data);
-    CHECK(pid) << "Failed to parse '" << data << "'";
-    pids.insert(pid);
+  foreach (const Option<std::string>& data, datas.get()) {
+    // Data could be None if the membership is gone before its
+    // content can be read.
+    if (data.isSome()) {
+      process::UPID pid(data.get());
+      CHECK(pid) << "Failed to parse '" << data.get() << "'";
+      pids.insert(pid);
+    }
   }
 
   LOG(INFO) << "ZooKeeper group PIDs: " << stringify(pids);

http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index 6436b8e..700eb9d 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -180,7 +180,9 @@ private:
   void detected(const Future<Option<Group::Membership> >& leader);
 
   // Invoked when we have fetched the data associated with the leader.
-  void fetched(const Group::Membership& membership, const Future<string>& 
data);
+  void fetched(
+      const Group::Membership& membership,
+      const Future<Option<string> >& data);
 
   Owned<Group> group;
   LeaderDetector detector;
@@ -388,7 +390,7 @@ void ZooKeeperMasterDetectorProcess::detected(
 
 void ZooKeeperMasterDetectorProcess::fetched(
     const Group::Membership& membership,
-    const Future<string>& data)
+    const Future<Option<string> >& data)
 {
   CHECK(!data.isDiscarded());
 
@@ -396,6 +398,11 @@ void ZooKeeperMasterDetectorProcess::fetched(
     leader = None();
     promises::fail(&promises, data.failure());
     return;
+  } else if (data.get().isNone()) {
+    // Membership is gone before we can read its data.
+    leader = None();
+    promises::set(&promises, leader);
+    return;
   }
 
   // Parse the data based on the membership label and cache the
@@ -404,12 +411,12 @@ void ZooKeeperMasterDetectorProcess::fetched(
   if (label.isNone()) {
     // If we are here it means some masters are still creating znodes
     // with the old format.
-    UPID pid = UPID(data.get());
+    UPID pid = UPID(data.get().get());
     LOG(WARNING) << "Leading master " << pid << " has data in old format";
     leader = protobuf::createMasterInfo(pid);
   } else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) {
     MasterInfo info;
-    if (!info.ParseFromString(data.get())) {
+    if (!info.ParseFromString(data.get().get())) {
       leader = None();
       promises::fail(&promises, "Failed to parse data into MasterInfo");
       return;

http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index 7ed9895..144ba82 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -41,6 +41,8 @@ using zookeeper::GroupProcess;
 
 using process::Future;
 
+using std::string;
+
 using testing::_;
 
 class GroupTest : public ZooKeeperTest {};
@@ -60,9 +62,10 @@ TEST_F(GroupTest, Group)
   EXPECT_EQ(1u, memberships.get().size());
   EXPECT_EQ(1u, memberships.get().count(membership.get()));
 
-  Future<std::string> data = group.data(membership.get());
+  Future<Option<string> > data = group.data(membership.get());
 
-  AWAIT_EXPECT_EQ("hello world", data);
+  AWAIT_READY(data);
+  EXPECT_SOME_EQ("hello world", data.get());
 
   Future<bool> cancellation = group.cancel(membership.get());
 
@@ -116,13 +119,37 @@ TEST_F(GroupTest, GroupDataWithDisconnect)
 
   server->shutdownNetwork();
 
-  Future<std::string> data = group.data(membership.get());
+  Future<Option<string> > data = group.data(membership.get());
 
   EXPECT_TRUE(data.isPending());
 
   server->startNetwork();
 
-  AWAIT_EXPECT_EQ("hello world", data);
+  AWAIT_READY(data);
+  EXPECT_SOME_EQ("hello world", data.get());
+}
+
+
+TEST_F(GroupTest, GroupDataWithRemovedMembership)
+{
+  Group group(server->connectString(), NO_TIMEOUT, "/test/");
+
+  Future<Group::Membership> membership = group.join("hello world");
+
+  AWAIT_READY(membership);
+
+  Future<std::set<Group::Membership> > memberships = group.watch();
+
+  AWAIT_READY(memberships);
+  EXPECT_EQ(1u, memberships.get().size());
+  EXPECT_EQ(1u, memberships.get().count(membership.get()));
+
+  AWAIT_EXPECT_EQ(true, group.cancel(membership.get()));
+
+  Future<Option<string> > data = group.data(membership.get());
+
+  AWAIT_READY(data);
+  EXPECT_NONE(data.get());
 }
 
 
@@ -140,9 +167,10 @@ TEST_F(GroupTest, GroupCancelWithDisconnect)
   EXPECT_EQ(1u, memberships.get().size());
   EXPECT_EQ(1u, memberships.get().count(membership.get()));
 
-  Future<std::string> data = group.data(membership.get());
+  Future<Option<string> > data = group.data(membership.get());
 
-  AWAIT_EXPECT_EQ("hello world", data);
+  AWAIT_READY(data);
+  EXPECT_SOME_EQ("hello world", data.get());
 
   server->shutdownNetwork();
 
@@ -378,7 +406,7 @@ TEST_F(GroupTest, LabelledGroup)
 
   // Join a group with label.
   Future<Group::Membership> membership = group.join(
-      "hello world", std::string("testlabel"));
+      "hello world", string("testlabel"));
 
   AWAIT_READY(membership);
 
@@ -388,9 +416,10 @@ TEST_F(GroupTest, LabelledGroup)
   EXPECT_EQ(1u, memberships.get().size());
   EXPECT_EQ(1u, memberships.get().count(membership.get()));
 
-  Future<std::string> data = group.data(membership.get());
+  Future<Option<string> > data = group.data(membership.get());
 
-  AWAIT_EXPECT_EQ("hello world", data);
+  AWAIT_READY(data);
+  EXPECT_SOME_EQ("hello world", data.get());
 
   Future<bool> cancellation = group.cancel(membership.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 58491c0..7dee0a1 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -210,7 +210,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& 
membership)
 }
 
 
-Future<string> GroupProcess::data(const Group::Membership& membership)
+Future<Option<string> > GroupProcess::data(const Group::Membership& membership)
 {
   if (error.isSome()) {
     return Failure(error.get());
@@ -224,7 +224,7 @@ Future<string> GroupProcess::data(const Group::Membership& 
membership)
   // client can assume a happens-before ordering of operations (i.e.,
   // the first request will happen before the second, etc).
 
-  Result<string> result = doData(membership);
+  Result<Option<string> > result = doData(membership);
 
   if (result.isNone()) { // Try again later.
     Data* data = new Data(membership);
@@ -649,7 +649,8 @@ Result<bool> GroupProcess::doCancel(const 
Group::Membership& membership)
 }
 
 
-Result<string> GroupProcess::doData(const Group::Membership& membership)
+Result<Option<string> > GroupProcess::doData(
+    const Group::Membership& membership)
 {
   CHECK_EQ(state, READY);
 
@@ -662,16 +663,18 @@ Result<string> GroupProcess::doData(const 
Group::Membership& membership)
 
   int code = zk->get(path, false, &result, NULL);
 
-  if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+  if (code == ZNONODE) {
+    return Option<string>::none();
+  } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
     CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
-    return None();
+    return None(); // Try again later.
   } else if (code != ZOK) {
     return Error(
         "Failed to get data for ephemeral node '" + path +
         "' in ZooKeeper: " + zk->message(code));
   }
 
-  return result;
+  return Some(result);
 }
 
 
@@ -844,7 +847,7 @@ Try<bool> GroupProcess::sync()
   while (!pending.datas.empty()) {
     Data* data = pending.datas.front();
     // TODO(benh): Ignore if future has been discarded?
-    Result<string> result = doData(data->membership);
+    Result<Option<string> > result = doData(data->membership);
     if (result.isNone()) {
       return false; // Try again later.
     } else if (result.isError()) {
@@ -992,7 +995,7 @@ Future<bool> Group::cancel(const Group::Membership& 
membership)
 }
 
 
-Future<string> Group::data(const Group::Membership& membership)
+Future<Option<string> > Group::data(const Group::Membership& membership)
 {
   return dispatch(process, &GroupProcess::data, membership);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/14c605e8/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 16f9b7b..dc600b1 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -131,7 +131,9 @@ public:
 
   // Returns the result of trying to fetch the data associated with a
   // group membership.
-  process::Future<std::string> data(const Membership& membership);
+  // A None is returned if the specified membership doesn't exist,
+  // e.g., it can be removed before this call can read it content.
+  process::Future<Option<std::string> > data(const Membership& membership);
 
   // Returns a future that gets set when the group memberships differ
   // from the "expected" memberships specified.
@@ -173,7 +175,8 @@ public:
       const std::string& data,
       const Option<std::string>& label);
   process::Future<bool> cancel(const Group::Membership& membership);
-  process::Future<std::string> data(const Group::Membership& membership);
+  process::Future<Option<std::string> > data(
+      const Group::Membership& membership);
   process::Future<std::set<Group::Membership> > watch(
       const std::set<Group::Membership>& expected);
   process::Future<Option<int64_t> > session();
@@ -192,7 +195,7 @@ private:
       const std::string& data,
       const Option<std::string>& label);
   Result<bool> doCancel(const Group::Membership& membership);
-  Result<std::string> doData(const Group::Membership& membership);
+  Result<Option<std::string> > doData(const Group::Membership& membership);
 
   // Returns true if authentication is successful, false if the
   // failure is retryable and Error otherwise.
@@ -287,7 +290,7 @@ private:
     explicit Data(const Group::Membership& _membership)
       : membership(_membership) {}
     Group::Membership membership;
-    process::Promise<std::string> promise;
+    process::Promise<Option<std::string> > promise;
   };
 
   struct Watch

Reply via email to