Repository: mesos Updated Branches: refs/heads/master d6f51c5a2 -> 27221fd51
Allowed replicated log to do auto initialization. Review: https://reviews.apache.org/r/18600 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/27221fd5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/27221fd5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/27221fd5 Branch: refs/heads/master Commit: 27221fd512bfd4b2ad85a1e4fcb6c7954df87d9b Parents: d6f51c5 Author: Jie Yu <yujie....@gmail.com> Authored: Fri Apr 4 11:26:07 2014 -0700 Committer: Jie Yu <yujie....@gmail.com> Committed: Tue Apr 22 17:26:53 2014 -0700 ---------------------------------------------------------------------- src/log/log.cpp | 47 ++++++++-- src/log/log.hpp | 6 +- src/log/recover.cpp | 209 +++++++++++++++++++++++++++++++++++++------ src/log/recover.hpp | 9 +- src/tests/log_tests.cpp | 138 ++++++++++++++++++++++++++++ 5 files changed, 371 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/log.cpp ---------------------------------------------------------------------- diff --git a/src/log/log.cpp b/src/log/log.cpp index 9dd992f..ca97144 100644 --- a/src/log/log.cpp +++ b/src/log/log.cpp @@ -55,7 +55,8 @@ public: LogProcess( size_t _quorum, const string& path, - const set<UPID>& pids); + const set<UPID>& pids, + bool _autoInitialize); LogProcess( size_t _quorum, @@ -63,7 +64,8 @@ public: const string& servers, const Duration& timeout, const string& znode, - const Option<zookeeper::Authentication>& auth); + const Option<zookeeper::Authentication>& auth, + bool _autoInitialize); // Recovers the log by catching up if needed. Returns a shared // pointer to the local replica if the recovery succeeds. @@ -91,6 +93,7 @@ private: const size_t quorum; Shared<Replica> replica; Shared<Network> network; + const bool autoInitialize; // For replica recovery. Option<Future<Owned<Replica> > > recovering; @@ -197,11 +200,13 @@ private: LogProcess::LogProcess( size_t _quorum, const string& path, - const set<UPID>& pids) + const set<UPID>& pids, + bool _autoInitialize) : ProcessBase(ID::generate("log")), quorum(_quorum), replica(new Replica(path)), network(new Network(pids + (UPID) replica->pid())), + autoInitialize(_autoInitialize), group(NULL) {} @@ -211,11 +216,13 @@ LogProcess::LogProcess( const string& servers, const Duration& timeout, const string& znode, - const Option<zookeeper::Authentication>& auth) + const Option<zookeeper::Authentication>& auth, + bool _autoInitialize) : ProcessBase(ID::generate("log")), quorum(_quorum), replica(new Replica(path)), network(new ZooKeeperNetwork(servers, timeout, znode, auth)), + autoInitialize(_autoInitialize), group(new zookeeper::Group(servers, timeout, znode, auth)) {} @@ -305,7 +312,12 @@ Future<Shared<Replica> > LogProcess::recover() // 'release' in Shared which will provide this CHECK internally. CHECK(replica.unique()); - recovering = log::recover(quorum, replica.own().get(), network) + recovering = + log::recover( + quorum, + replica.own().get(), + network, + autoInitialize) .onAny(defer(self(), &Self::_recover)); } @@ -720,11 +732,18 @@ void LogWriterProcess::failed(const string& message, const string& reason) Log::Log( int quorum, const string& path, - const set<UPID>& pids) + const set<UPID>& pids, + bool autoInitialize) { GOOGLE_PROTOBUF_VERIFY_VERSION; - process = new LogProcess(quorum, path, pids); + process = + new LogProcess( + quorum, + path, + pids, + autoInitialize); + spawn(process); } @@ -734,11 +753,21 @@ Log::Log( const string& servers, const Duration& timeout, const string& znode, - const Option<zookeeper::Authentication>& auth) + const Option<zookeeper::Authentication>& auth, + bool autoInitialize) { GOOGLE_PROTOBUF_VERIFY_VERSION; - process = new LogProcess(quorum, path, servers, timeout, znode, auth); + process = + new LogProcess( + quorum, + path, + servers, + timeout, + znode, + auth, + autoInitialize); + spawn(process); } http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/log.hpp ---------------------------------------------------------------------- diff --git a/src/log/log.hpp b/src/log/log.hpp index 6787c80..7c905c7 100644 --- a/src/log/log.hpp +++ b/src/log/log.hpp @@ -189,7 +189,8 @@ public: // with other replicas via the set of process PIDs. Log(int quorum, const std::string& path, - const std::set<process::UPID>& pids); + const std::set<process::UPID>& pids, + bool autoInitialize = false); // Creates a new replicated log that assumes the specified quorum // size, is backed by a file at the specified path, and coordinates @@ -200,7 +201,8 @@ public: const std::string& servers, const Duration& timeout, const std::string& znode, - const Option<zookeeper::Authentication>& auth = None()); + const Option<zookeeper::Authentication>& auth = None(), + bool autoInitialize = false); ~Log(); http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/recover.cpp ---------------------------------------------------------------------- diff --git a/src/log/recover.cpp b/src/log/recover.cpp index 688da5f..7c0566a 100644 --- a/src/log/recover.cpp +++ b/src/log/recover.cpp @@ -27,6 +27,7 @@ #include <process/process.hpp> #include <stout/check.hpp> +#include <stout/duration.hpp> #include <stout/foreach.hpp> #include <stout/hashmap.hpp> #include <stout/lambda.hpp> @@ -48,7 +49,6 @@ namespace mesos { namespace internal { namespace log { - // This class is responsible for executing the log recover protocol. // Any time a replica in non-VOTING status starts, we will run this // protocol. We first broadcast a recover request to all the replicas @@ -58,9 +58,18 @@ namespace log { // // A) Broadcast a RecoverRequest to all replicas in the network. // B) Collect RecoverResponse from each replica -// B1) If a quorum of replicas are found in VOTING status, the local -// replica will be in RECOVERING status next. -// B2) Otherwise, goto (A). +// B1) If a quorum of replicas are found in VOTING status (no matter +// what status the local replica is in currently), the local +// replica will be put in RECOVERING status next. +// B2) If the local replica is in EMPTY status and all replicas are +// found in either EMPTY status or STARTING status, the local +// replica will be put in STARTING status next. +// B3) If the local replica is in STARTING status and all replicas +// are found in either STARTING status or VOTING status, the +// local replica will be put in VOTING status next. +// B4) Otherwise, goto (A). +// +// (B2 and B3 are used to do the two-phase auto initialization.) // // We re-use RecoverResponse to specify the return value. The 'status' // field specifies the next status of the local replica. If the next @@ -71,10 +80,17 @@ class RecoverProtocolProcess : public Process<RecoverProtocolProcess> public: RecoverProtocolProcess( size_t _quorum, - const Shared<Network>& _network) + const Shared<Network>& _network, + const Metadata::Status& _status, + bool _autoInitialize, + const Duration& _timeout) : ProcessBase(ID::generate("log-recover-protocol")), quorum(_quorum), - network(_network) {} + network(_network), + status(_status), + autoInitialize(_autoInitialize), + timeout(_timeout), + terminating(false) {} Future<RecoverResponse> future() { return promise.future(); } @@ -88,8 +104,25 @@ protected: } private: + static Future<Option<RecoverResponse> > timedout( + Future<Option<RecoverResponse> > future, + const Duration& timeout) + { + LOG(INFO) << "Unable to finish the recover protocol in " + << timeout << ", retrying"; + + future.discard(); + + // The 'future' will eventually become DISCARDED, at which time we + // will re-run the recover protocol. We use the boolean flag + // 'terminating' to distinguish between a user initiated discard + // and a timeout induced discard. + return future; + } + void discard() { + terminating = true; chain.discard(); } @@ -100,6 +133,7 @@ private: chain = network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO) .then(defer(self(), &Self::broadcast)) .then(defer(self(), &Self::receive)) + .after(timeout, lambda::bind(&Self::timedout, lambda::_1, timeout)) .onAny(defer(self(), &Self::finished, lambda::_1)); } @@ -187,15 +221,88 @@ private: return result; } + if (autoInitialize) { + // The following code handles the auto-initialization. Our idea + // is: we allow a replica in EMPTY status to become VOTING + // immediately if it finds ALL (i.e., 2 * quorum - 1) replicas + // are in EMPTY status. This is based on the assumption that the + // only time ALL replicas are in EMPTY status is during start-up + // This may not be true if we have a catastrophic failure in + // which all replicas are gone, and that's exactly the reason we + // allow users to disable auto-initialization. + // + // To do auto-initialization, if we use a single phase protocol + // and allow a replica to directly transit from EMPTY status to + // VOTING status, we may run into a state where we cannot make + // progress even if all replicas are in EMPTY status initially. + // For example, say the quorum size is 2. All replicas are in + // EMPTY status initially. One replica broadcasts a recover + // request and becomes VOTING before other replicas start + // broadcasting recover requests. In that case, no replica can + // make progress. To solve this problem, we use a two-phase + // protocol and introduce an intermediate transient status + // (STARTING) between EMPTY and VOTING status. A replica in + // EMPTY status can transit to STARTING status if it find all + // replicas are in either EMPTY or STARTING status. A replica in + // STARTING status can transit to VOTING status if it finds all + // replicas are in either STARTING or VOTING status. In that + // way, in our previous example, all replicas will be in + // STARTING status before any of them can transit to VOTING + // status. + + // TODO(jieyu): Currently, we simply calculate the size of the + // cluster from the quorum size. In the future, we may wanna + // allow users to specify the cluster size in case they want to + // use a non-standard quorum size (e.g., cluster size = 5, + // quorum size = 4). + size_t clusterSize = (2 * quorum) - 1; + + switch (status) { + case Metadata::EMPTY: + if ((responsesReceived[Metadata::EMPTY] + + responsesReceived[Metadata::STARTING]) >= clusterSize) { + process::discard(responses); + + RecoverResponse result; + result.set_status(Metadata::STARTING); + + return result; + } + break; + case Metadata::STARTING: + if ((responsesReceived[Metadata::STARTING] + + responsesReceived[Metadata::VOTING]) >= clusterSize) { + process::discard(responses); + + RecoverResponse result; + result.set_status(Metadata::VOTING); + + return result; + } + break; + default: + // Ignore all other cases. + break; + } + } + // Handle the next response. return receive(); } void finished(const Future<Option<RecoverResponse> >& future) { - if (future.isDiscarded()) { - promise.discard(); - terminate(self()); + if (future.isDiscarded()) { + // We use the boolean flag 'terminating' to distinguish between + // a user initiated discard and a timeout induced discard. In + // the case of a user initiated discard, the flag 'terminating' + // will be set to true in 'Self::discard()'. + if (terminating) { + promise.discard(); + terminate(self()); + } else { + start(); // Re-run the recover protocol after timeout. + } } else if (future.isFailed()) { promise.fail(future.failure()); terminate(self()); @@ -216,24 +323,37 @@ private: const size_t quorum; const Shared<Network> network; + const Metadata::Status status; + const bool autoInitialize; + const Duration timeout; set<Future<RecoverResponse> > responses; hashmap<Metadata::Status, size_t> responsesReceived; Option<uint64_t> lowestBeginPosition; Option<uint64_t> highestEndPosition; Future<Option<RecoverResponse> > chain; + bool terminating; process::Promise<RecoverResponse> promise; }; -// The wrapper for running the recover protocol. +// The wrapper for running the recover protocol. We will re-run the +// recover protocol if it cannot be finished within 'timeout'. static Future<RecoverResponse> runRecoverProtocol( size_t quorum, - const Shared<Network>& network) + const Shared<Network>& network, + const Metadata::Status& status, + bool autoInitialize, + const Duration& timeout = Seconds(10)) { RecoverProtocolProcess* process = - new RecoverProtocolProcess(quorum, network); + new RecoverProtocolProcess( + quorum, + network, + status, + autoInitialize, + timeout); Future<RecoverResponse> future = process->future(); spawn(process, true); @@ -249,7 +369,9 @@ static Future<RecoverResponse> runRecoverProtocol( // the next status is determined to be RECOVERING, we will start doing // catch-up. Later, if the local replica has caught-up, we will set // the status of the local replica to VOTING and terminate the -// process, indicating the recovery has completed. +// process, indicating the recovery has completed. If all replicas are +// in EMPTY status and auto-initialization is enabled, a two-phase +// protocol will be used to bootstrap the replicated log. // // Here, we list a few scenarios and show how the recover process will // respond in those scenarios. All the examples assume a quorum size @@ -273,17 +395,27 @@ static Future<RecoverResponse> runRecoverProtocol( // 4) Replica A is in VOTING status and B is in EMPTY status. The // operator adds replica C. In that case, C will stay in EMPTY // status forever similar to case 3). +// +// 5) Replica A, B and C are all in EMPTY status. Depending on whether +// auto-initialization is enabled or not, the replicas will behave +// differently. If auto-initialization is enabled, all replicas +// will first go into STARTING status. Once *all* replicas have +// transitioned out of EMPTY status, the replicas will go into +// VOTING status. If auto-initialization is disabled, all replicas +// will remain in EMPTY status. class RecoverProcess : public Process<RecoverProcess> { public: RecoverProcess( size_t _quorum, const Owned<Replica>& _replica, - const Shared<Network>& _network) + const Shared<Network>& _network, + bool _autoInitialize) : ProcessBase(ID::generate("log-recover")), quorum(_quorum), replica(_replica), - network(_network) {} + network(_network), + autoInitialize(_autoInitialize) {} Future<Owned<Replica> > future() { return promise.future(); } @@ -322,20 +454,39 @@ private: // No need to do recovery. return Nothing(); } else { - return runRecoverProtocol(quorum, network) + return runRecoverProtocol(quorum, network, status, autoInitialize) .then(defer(self(), &Self::_recover, lambda::_1)); } } Future<Nothing> _recover(const RecoverResponse& result) { - if (result.status() == Metadata::RECOVERING) { - CHECK(result.has_begin() && result.has_end()); + switch (result.status()) { + case Metadata::STARTING: + // This is the auto-initialization case. As mentioned above, we + // use a two-phase protocol to bootstrap. When the control + // reaches here, the first phase just ended. We start the second + // phase by re-running the recover protocol. + CHECK(autoInitialize); - return updateReplicaStatus(Metadata::RECOVERING) - .then(defer(self(), &Self::catchup, result.begin(), result.end())); - } else { - return Failure("Unexpected status returned from the recover protocol"); + return updateReplicaStatus(Metadata::STARTING) + .then(defer(self(), &Self::recover, Metadata::STARTING)); + + case Metadata::VOTING: + // This is the also the auto-initialization case. When the + // control reaches here, the second phase just ended. + CHECK(autoInitialize); + + return updateReplicaStatus(Metadata::VOTING); + + case Metadata::RECOVERING: + CHECK(result.has_begin() && result.has_end()); + + return updateReplicaStatus(Metadata::RECOVERING) + .then(defer(self(), &Self::catchup, result.begin(), result.end())); + + default: + return Failure("Unexpected status returned from the recover protocol"); } } @@ -410,7 +561,7 @@ private: Future<Nothing> getReplicaOwnership(Shared<Replica> shared) { - // Try to regain the ownership of the replica. + // Try to re-gain the ownership of the replica. return shared.own() .then(defer(self(), &Self::_getReplicaOwnership, lambda::_1)); } @@ -439,6 +590,7 @@ private: const size_t quorum; Owned<Replica> replica; const Shared<Network> network; + const bool autoInitialize; Future<Nothing> chain; @@ -449,9 +601,16 @@ private: Future<Owned<Replica> > recover( size_t quorum, const Owned<Replica>& replica, - const Shared<Network>& network) + const Shared<Network>& network, + bool autoInitialize) { - RecoverProcess* process = new RecoverProcess(quorum, replica, network); + RecoverProcess* process = + new RecoverProcess( + quorum, + replica, + network, + autoInitialize); + Future<Owned<Replica> > future = process->future(); spawn(process, true); return future; http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/recover.hpp ---------------------------------------------------------------------- diff --git a/src/log/recover.hpp b/src/log/recover.hpp index 634bc06..6243c18 100644 --- a/src/log/recover.hpp +++ b/src/log/recover.hpp @@ -45,12 +45,17 @@ namespace log { // positions are recovered such that if other replicas fail, the // remaining replicas can restore all the successfully written log // entries; 2) its future votes cannot not contradict its lost votes. +// // This function returns an owned pointer to the recovered replica if -// the recovery is successful. +// the recovery is successful. If the auto-initialization flag is set, +// an empty replica will be allowed to vote if ALL replicas (i.e., +// quorum * 2 - 1) are empty. This allows us to bootstrap the +// replicated log without explicitly using an initialization tool. extern process::Future<process::Owned<Replica> > recover( size_t quorum, const process::Owned<Replica>& replica, - const process::Shared<Network>& network); + const process::Shared<Network>& network, + bool autoInitialize = false); } // namespace log { } // namespace internal { http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/tests/log_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp index 4f08927..db91ef8 100644 --- a/src/tests/log_tests.cpp +++ b/src/tests/log_tests.cpp @@ -1698,6 +1698,144 @@ TEST_F(RecoverTest, CatchupRetry) } +TEST_F(RecoverTest, AutoInitialization) +{ + const string path1 = os::getcwd() + "/.log1"; + const string path2 = os::getcwd() + "/.log2"; + const string path3 = os::getcwd() + "/.log3"; + + Owned<Replica> replica1(new Replica(path1)); + Owned<Replica> replica2(new Replica(path2)); + Owned<Replica> replica3(new Replica(path3)); + + set<UPID> pids; + pids.insert(replica1->pid()); + pids.insert(replica2->pid()); + pids.insert(replica3->pid()); + + Shared<Network> network(new Network(pids)); + + Future<Owned<Replica> > recovering1 = recover(2, replica1, network, true); + Future<Owned<Replica> > recovering2 = recover(2, replica2, network, true); + + // Verifies that replica1 and replica2 cannot transit into VOTING + // status because replica3 is still in EMPTY status. We flush the + // event queue before checking. + Clock::pause(); + Clock::settle(); + Clock::resume(); + + EXPECT_TRUE(recovering1.isPending()); + EXPECT_TRUE(recovering2.isPending()); + + Future<Owned<Replica> > recovering3 = recover(2, replica3, network, true); + + AWAIT_READY(recovering1); + AWAIT_READY(recovering2); + AWAIT_READY(recovering3); + + Owned<Replica> shared_ = recovering1.get(); + Shared<Replica> shared = shared_.share(); + + Coordinator coord(2, shared, network); + + { + Future<Option<uint64_t> > electing = coord.elect(); + AWAIT_READY(electing); + EXPECT_SOME_EQ(0u, electing.get()); + } + + { + Future<Option<uint64_t> > appending = coord.append("hello world"); + AWAIT_READY(appending); + EXPECT_SOME_EQ(1u, appending.get()); + } + + { + Future<list<Action> > actions = shared->read(1, 1); + AWAIT_READY(actions); + ASSERT_EQ(1u, actions.get().size()); + EXPECT_EQ(1u, actions.get().front().position()); + ASSERT_TRUE(actions.get().front().has_type()); + ASSERT_EQ(Action::APPEND, actions.get().front().type()); + EXPECT_EQ("hello world", actions.get().front().append().bytes()); + } +} + + +TEST_F(RecoverTest, AutoInitializationRetry) +{ + const string path1 = os::getcwd() + "/.log1"; + const string path2 = os::getcwd() + "/.log2"; + const string path3 = os::getcwd() + "/.log3"; + + Owned<Replica> replica1(new Replica(path1)); + Owned<Replica> replica2(new Replica(path2)); + Owned<Replica> replica3(new Replica(path3)); + + set<UPID> pids; + pids.insert(replica1->pid()); + pids.insert(replica2->pid()); + pids.insert(replica3->pid()); + + Shared<Network> network(new Network(pids)); + + // Simulate the case where replica3 is temporarily removed. + DROP_MESSAGE(Eq(RecoverRequest().GetTypeName()), _, Eq(replica3->pid())); + DROP_MESSAGE(Eq(RecoverRequest().GetTypeName()), _, Eq(replica3->pid())); + + Clock::pause(); + + Future<Owned<Replica> > recovering1 = recover(2, replica1, network, true); + Future<Owned<Replica> > recovering2 = recover(2, replica2, network, true); + + // Flush the event queue. + Clock::settle(); + + EXPECT_TRUE(recovering1.isPending()); + EXPECT_TRUE(recovering2.isPending()); + + Future<Owned<Replica> > recovering3 = recover(2, replica3, network, true); + + // Replica1 and replica2 will retry recovery after 10 seconds. + Clock::advance(Seconds(10)); + Clock::settle(); + + Clock::resume(); + + AWAIT_READY(recovering1); + AWAIT_READY(recovering2); + AWAIT_READY(recovering3); + + Owned<Replica> shared_ = recovering1.get(); + Shared<Replica> shared = shared_.share(); + + Coordinator coord(2, shared, network); + + { + Future<Option<uint64_t> > electing = coord.elect(); + AWAIT_READY(electing); + EXPECT_SOME_EQ(0u, electing.get()); + } + + { + Future<Option<uint64_t> > appending = coord.append("hello world"); + AWAIT_READY(appending); + EXPECT_SOME_EQ(1u, appending.get()); + } + + { + Future<list<Action> > actions = shared->read(1, 1); + AWAIT_READY(actions); + ASSERT_EQ(1u, actions.get().size()); + EXPECT_EQ(1u, actions.get().front().position()); + ASSERT_TRUE(actions.get().front().has_type()); + ASSERT_EQ(Action::APPEND, actions.get().front().type()); + EXPECT_EQ("hello world", actions.get().front().append().bytes()); + } +} + + class LogTest : public TemporaryDirectoryTest { protected: