Repository: incubator-ratis Updated Branches: refs/heads/master 6c97d0603 -> 430932441
RATIS-225. Grpc LogAppender should update matchIndex on heartbeat reply. Contribtued Lokesh Jain Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/43093244 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/43093244 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/43093244 Branch: refs/heads/master Commit: 4309324418d90f8a86160456019f04e9e66bb2cb Parents: 6c97d06 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Sat Apr 14 00:33:40 2018 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Sat Apr 14 00:33:40 2018 +0800 ---------------------------------------------------------------------- .../ratis/grpc/server/GRpcLogAppender.java | 15 +++-- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 68 ++++++++++++++++++++ .../apache/ratis/server/impl/FollowerInfo.java | 3 +- .../apache/ratis/server/impl/LeaderState.java | 4 ++ .../ratis/server/impl/RaftServerImpl.java | 4 ++ .../java/org/apache/ratis/RaftAsyncTests.java | 1 - .../ratis/server/impl/RaftServerTestUtil.java | 5 ++ 7 files changed, 93 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 360d020..2c52aa0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -278,7 +278,7 @@ public class GRpcLogAppender extends LogAppender { follower.decreaseNextIndex(newNextIndex); } - private void onSuccess(AppendEntriesReplyProto reply) { + protected synchronized void onSuccess(AppendEntriesReplyProto reply) { AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); if (request == null) { // If reply comes after timeout, the reply is ignored. @@ -292,19 +292,26 @@ public class GRpcLogAppender extends LogAppender { () -> "Got reply with next index " + replyNextIndex + " but the pending queue is empty"); + final long lastIndex = replyNextIndex - 1; + final boolean updateMatchIndex; + if (request.getEntriesCount() == 0) { Preconditions.assertTrue(!request.hasPreviousLog() || - replyNextIndex - 1 == request.getPreviousLog().getIndex(), + lastIndex == request.getPreviousLog().getIndex(), "reply's next index is %s, request's previous is %s", replyNextIndex, request.getPreviousLog()); + updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex; } else { // check if the reply and the pending request is consistent final long lastEntryIndex = request .getEntries(request.getEntriesCount() - 1).getIndex(); - Preconditions.assertTrue(replyNextIndex == lastEntryIndex + 1, + Preconditions.assertTrue(lastIndex == lastEntryIndex, "reply's next index is %s, request's last entry index is %s", replyNextIndex, lastEntryIndex); - follower.updateMatchIndex(lastEntryIndex); + updateMatchIndex = true; + } + if (updateMatchIndex) { + follower.updateMatchIndex(lastIndex); submitEventOnSuccessAppend(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 604e51a..e6a3b57 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -17,18 +17,32 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftBasicTests; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static org.apache.ratis.RaftTestUtil.waitForLeader; + public class TestRaftWithGrpc extends RaftBasicTests { private final MiniRaftClusterWithGRpc cluster; public TestRaftWithGrpc() throws IOException { + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster( NUM_SERVERS, properties); Assert.assertNull(cluster.getLeader()); @@ -51,4 +65,58 @@ public class TestRaftWithGrpc extends RaftBasicTests { throws IOException, InterruptedException, ExecutionException { testRequestTimeout(false, getCluster(), LOG, getProperties()); } + + @Test + public void testUpdateViaHeartbeat() + throws IOException, InterruptedException, ExecutionException { + LOG.info("Running testUpdateViaHeartbeat"); + final MiniRaftClusterWithGRpc cluster = getCluster(); + waitForLeader(cluster); + long waitTime = 5000; + try (final RaftClient client = cluster.createClient()) { + // block append requests + cluster.getServerAliveStream().forEach(raftServer -> { + try { + if (!raftServer.isLeader()) { + ((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(true); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while blocking append", e); + } + }); + CompletableFuture<RaftClientReply> + replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc")); + Thread.sleep(waitTime); + // replyFuture should not be completed until append request is unblocked. + Assert.assertTrue(!replyFuture.isDone()); + // unblock append request. + cluster.getServerAliveStream().forEach(raftServer -> { + try { + ((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(false); + } catch (InterruptedException e) { + LOG.error("Interrupted while unblocking append", e); + } + }); + long index = cluster.getLeader().getState().getLog().getNextIndex(); + TermIndex[] leaderEntries = cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE); + // The entries have been appended in the followers + // although the append entry timed out at the leader + cluster.getServerAliveStream().forEach(raftServer -> { + Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index); + if (!raftServer.isLeader()) { + TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE); + Arrays.equals(serverEntries, leaderEntries); + } + }); + + // Wait for heartbeats from leader to be received by followers + Thread.sleep(1000); + RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> { + // FollowerInfo in the leader state should have updated next and match index. + Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 1); + Assert.assertEquals(logAppender.getFollower().getNextIndex(), index); + }); + } + cluster.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java index 246b9df..2a40a55 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java @@ -18,7 +18,6 @@ package org.apache.ratis.server.impl; import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.Timestamp; @@ -48,7 +47,7 @@ public class FollowerInfo { this.matchIndex.set(matchIndex); } - long getMatchIndex() { + public long getMatchIndex() { return matchIndex.get(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 309ebf5..242cd7c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -641,4 +641,8 @@ public class LeaderState { .map(sender -> sender.getFollower().getPeer()) .collect(Collectors.toList())); } + + Stream<LogAppender> getLogAppenders() { + return senders.stream(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 4e4a811..28ce59f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -208,6 +208,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return state; } + LeaderState getLeaderState() { + return leaderState; + } + public RaftPeerId getId() { return getState().getSelfId(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 438d56a..db466df 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -305,7 +305,6 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba LOG.error("Interrupted while unblocking append", e); } }); - client.send(new RaftTestUtil.SimpleMessage("abc")); replyFuture.get(); Assert.assertTrue(System.currentTimeMillis() - time > waitTime); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 686a008..47d8c9d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; +import java.util.stream.Stream; public class RaftServerTestUtil { static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class); @@ -89,4 +90,8 @@ public class RaftServerTestUtil { RaftGroup group, RaftProperties properties, Parameters parameters) throws IOException { return new RaftServerProxy(id, stateMachine, group, properties, parameters); } + + public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) { + return server.getLeaderState().getLogAppenders(); + } }