Repository: mesos Updated Branches: refs/heads/master 2159b55f9 -> 8b1b7d4fb
Enforced log coordinator demotion after a write is discarded. Review: https://reviews.apache.org/r/18488 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b1b7d4f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b1b7d4f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b1b7d4f Branch: refs/heads/master Commit: 8b1b7d4fb84b8b4bfeae7d2f1bcb31a9fa568452 Parents: 2159b55 Author: Jie Yu <yujie....@gmail.com> Authored: Tue Feb 25 15:19:41 2014 -0800 Committer: Jie Yu <yujie....@gmail.com> Committed: Thu Feb 27 17:06:04 2014 -0800 ---------------------------------------------------------------------- src/log/coordinator.cpp | 7 ++++++- src/tests/log_tests.cpp | 49 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8b1b7d4f/src/log/coordinator.cpp ---------------------------------------------------------------------- diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp index 52e8305..6bfff1e 100644 --- a/src/log/coordinator.cpp +++ b/src/log/coordinator.cpp @@ -414,7 +414,12 @@ void CoordinatorProcess::writingFailed() void CoordinatorProcess::writingAborted() { CHECK_EQ(state, WRITING); - state = ELECTED; + + // Demote the coordinator if a write operation is discarded since we + // don't actually know the write was successful or not and we really + // need to "catch-up" that position before we try and do another + // write (see MESOS-1038 for more details). + state = INITIAL; } http://git-wip-us.apache.org/repos/asf/mesos/blob/8b1b7d4f/src/tests/log_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp index 1e59b24..0e2485f 100644 --- a/src/tests/log_tests.cpp +++ b/src/tests/log_tests.cpp @@ -707,6 +707,55 @@ TEST_F(CoordinatorTest, AppendReadError) } +TEST_F(CoordinatorTest, AppendDiscarded) +{ + const string path1 = os::getcwd() + "/.log1"; + initializer.flags.path = path1; + initializer.execute(); + + const string path2 = os::getcwd() + "/.log2"; + initializer.flags.path = path2; + initializer.execute(); + + Shared<Replica> replica1(new Replica(path1)); + Shared<Replica> replica2(new Replica(path2)); + + set<UPID> pids; + pids.insert(replica1->pid()); + pids.insert(replica2->pid()); + + Shared<Network> network(new Network(pids)); + + Coordinator coord(2, replica1, network); + + { + Future<Option<uint64_t> > electing = coord.elect(); + AWAIT_READY_FOR(electing, Seconds(10)); + ASSERT_SOME(electing.get()); + EXPECT_EQ(0u, electing.get().get()); + } + + process::terminate(replica2->pid()); + process::wait(replica2->pid()); + replica2.reset(); + + { + Future<uint64_t> appending = coord.append("hello world"); + ASSERT_TRUE(appending.isPending()); + + appending.discard(); + AWAIT_DISCARDED(appending); + } + + { + Future<uint64_t> appending = coord.append("hello moto"); + AWAIT_FAILED(appending); + + EXPECT_EQ("Coordinator is not elected", appending.failure()); + } +} + + TEST_F(CoordinatorTest, ElectNoQuorum) { const string path = os::getcwd() + "/.log";