Repository: incubator-ratis
Updated Branches:
  refs/heads/master 6c97d0603 -> 430932441


RATIS-225. Grpc LogAppender should update matchIndex on heartbeat reply.  
Contribtued Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/43093244
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/43093244
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/43093244

Branch: refs/heads/master
Commit: 4309324418d90f8a86160456019f04e9e66bb2cb
Parents: 6c97d06
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Sat Apr 14 00:33:40 2018 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Sat Apr 14 00:33:40 2018 +0800

----------------------------------------------------------------------
 .../ratis/grpc/server/GRpcLogAppender.java      | 15 +++--
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 68 ++++++++++++++++++++
 .../apache/ratis/server/impl/FollowerInfo.java  |  3 +-
 .../apache/ratis/server/impl/LeaderState.java   |  4 ++
 .../ratis/server/impl/RaftServerImpl.java       |  4 ++
 .../java/org/apache/ratis/RaftAsyncTests.java   |  1 -
 .../ratis/server/impl/RaftServerTestUtil.java   |  5 ++
 7 files changed, 93 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 360d020..2c52aa0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -278,7 +278,7 @@ public class GRpcLogAppender extends LogAppender {
     follower.decreaseNextIndex(newNextIndex);
   }
 
-  private void onSuccess(AppendEntriesReplyProto reply) {
+  protected synchronized void onSuccess(AppendEntriesReplyProto reply) {
     AppendEntriesRequestProto request = 
pendingRequests.remove(reply.getServerReply().getCallId());
     if (request == null) {
       // If reply comes after timeout, the reply is ignored.
@@ -292,19 +292,26 @@ public class GRpcLogAppender extends LogAppender {
         () -> "Got reply with next index " + replyNextIndex
             + " but the pending queue is empty");
 
+    final long lastIndex = replyNextIndex - 1;
+    final boolean updateMatchIndex;
+
     if (request.getEntriesCount() == 0) {
       Preconditions.assertTrue(!request.hasPreviousLog() ||
-              replyNextIndex - 1 == request.getPreviousLog().getIndex(),
+              lastIndex == request.getPreviousLog().getIndex(),
           "reply's next index is %s, request's previous is %s",
           replyNextIndex, request.getPreviousLog());
+      updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() 
< lastIndex;
     } else {
       // check if the reply and the pending request is consistent
       final long lastEntryIndex = request
           .getEntries(request.getEntriesCount() - 1).getIndex();
-      Preconditions.assertTrue(replyNextIndex == lastEntryIndex + 1,
+      Preconditions.assertTrue(lastIndex == lastEntryIndex,
           "reply's next index is %s, request's last entry index is %s",
           replyNextIndex, lastEntryIndex);
-      follower.updateMatchIndex(lastEntryIndex);
+      updateMatchIndex = true;
+    }
+    if (updateMatchIndex) {
+      follower.updateMatchIndex(lastIndex);
       submitEventOnSuccessAppend();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 604e51a..e6a3b57 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -17,18 +17,32 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftBasicTests;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
 public class TestRaftWithGrpc extends RaftBasicTests {
   private final MiniRaftClusterWithGRpc cluster;
 
   public TestRaftWithGrpc() throws IOException {
+    properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
     cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(
         NUM_SERVERS, properties);
     Assert.assertNull(cluster.getLeader());
@@ -51,4 +65,58 @@ public class TestRaftWithGrpc extends RaftBasicTests {
       throws IOException, InterruptedException, ExecutionException {
     testRequestTimeout(false, getCluster(), LOG, getProperties());
   }
+
+  @Test
+  public void testUpdateViaHeartbeat()
+      throws IOException, InterruptedException, ExecutionException {
+    LOG.info("Running testUpdateViaHeartbeat");
+    final MiniRaftClusterWithGRpc cluster = getCluster();
+    waitForLeader(cluster);
+    long waitTime = 5000;
+    try (final RaftClient client = cluster.createClient()) {
+      // block append requests
+      cluster.getServerAliveStream().forEach(raftServer -> {
+        try {
+          if (!raftServer.isLeader()) {
+            ((SimpleStateMachine4Testing) 
raftServer.getStateMachine()).setBlockAppend(true);
+          }
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted while blocking append", e);
+        }
+      });
+      CompletableFuture<RaftClientReply>
+          replyFuture = client.sendAsync(new 
RaftTestUtil.SimpleMessage("abc"));
+      Thread.sleep(waitTime);
+      // replyFuture should not be completed until append request is unblocked.
+      Assert.assertTrue(!replyFuture.isDone());
+      // unblock append request.
+      cluster.getServerAliveStream().forEach(raftServer -> {
+        try {
+          ((SimpleStateMachine4Testing) 
raftServer.getStateMachine()).setBlockAppend(false);
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted while unblocking append", e);
+        }
+      });
+      long index = cluster.getLeader().getState().getLog().getNextIndex();
+      TermIndex[] leaderEntries = 
cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE);
+      // The entries have been appended in the followers
+      // although the append entry timed out at the leader
+      cluster.getServerAliveStream().forEach(raftServer -> {
+        Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), 
index);
+        if (!raftServer.isLeader()) {
+          TermIndex[] serverEntries = 
raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
+          Arrays.equals(serverEntries, leaderEntries);
+        }
+      });
+
+      // Wait for heartbeats from leader to be received by followers
+      Thread.sleep(1000);
+      
RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> {
+        // FollowerInfo in the leader state should have updated next and match 
index.
+        Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 
1);
+        Assert.assertEquals(logAppender.getFollower().getNextIndex(), index);
+      });
+    }
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 246b9df..2a40a55 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.Timestamp;
 
@@ -48,7 +47,7 @@ public class FollowerInfo {
     this.matchIndex.set(matchIndex);
   }
 
-  long getMatchIndex() {
+  public long getMatchIndex() {
     return matchIndex.get();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 309ebf5..242cd7c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -641,4 +641,8 @@ public class LeaderState {
         .map(sender -> sender.getFollower().getPeer())
         .collect(Collectors.toList()));
   }
+
+  Stream<LogAppender> getLogAppenders() {
+    return senders.stream();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4e4a811..28ce59f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -208,6 +208,10 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return state;
   }
 
+  LeaderState getLeaderState() {
+    return leaderState;
+  }
+
   public RaftPeerId getId() {
     return getState().getSelfId();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 438d56a..db466df 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -305,7 +305,6 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
           LOG.error("Interrupted while unblocking append", e);
         }
       });
-      client.send(new RaftTestUtil.SimpleMessage("abc"));
       replyFuture.get();
       Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/43093244/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 686a008..47d8c9d 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.stream.Stream;
 
 public class RaftServerTestUtil {
   static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
@@ -89,4 +90,8 @@ public class RaftServerTestUtil {
       RaftGroup group, RaftProperties properties, Parameters parameters) 
throws IOException {
     return new RaftServerProxy(id, stateMachine, group, properties, 
parameters);
   }
+
+  public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
+    return server.getLeaderState().getLogAppenders();
+  }
 }

Reply via email to