Repository: incubator-ratis Updated Branches: refs/heads/master 812d7dbeb -> 0235de0ec
RATIS-97. Pass RaftGroup to reinitialize. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0235de0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0235de0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0235de0e Branch: refs/heads/master Commit: 0235de0ececb0c647858ede3228d25f4928df712 Parents: 812d7db Author: Jing Zhao <ji...@apache.org> Authored: Thu Aug 10 22:42:43 2017 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Thu Aug 10 22:42:43 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 4 ++-- .../ratis/client/impl/ClientProtoUtils.java | 6 ++--- .../ratis/client/impl/RaftClientImpl.java | 17 +++++++------ .../org/apache/ratis/protocol/RaftGroup.java | 10 +++++++- .../ratis/protocol/ReinitializeRequest.java | 10 ++++---- .../java/org/apache/ratis/util/NetUtils.java | 6 ++++- .../java/org/apache/ratis/util/ProtoUtils.java | 15 ++++++++---- ratis-proto-shaded/src/main/proto/Raft.proto | 2 +- .../org/apache/ratis/server/RaftServer.java | 2 +- .../ratis/server/impl/RaftServerProxy.java | 4 ++-- .../server/impl/ReinitializationBaseTest.java | 25 ++++++++++---------- 11 files changed, 61 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 1a7faf6..44fc186 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -51,8 +51,8 @@ public interface RaftClient extends Closeable { /** Send set configuration request to the raft service. */ RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException; - /** Send reinitialize request to the service. */ - RaftClientReply reinitialize(RaftPeer[] serversInNewConf, RaftPeerId server) throws IOException; + /** Send reinitialize request to the given server (not the raft service). */ + RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) throws IOException; /** @return a {@link Builder}. */ static Builder newBuilder() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 2968884..bfb25be 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -212,19 +212,19 @@ public class ClientProtoUtils { public static ReinitializeRequest toReinitializeRequest( ReinitializeRequestProto p) { final RaftRpcRequestProto m = p.getRpcRequest(); - final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList()); return new ReinitializeRequest( new ClientId(m.getRequestorId()), RaftPeerId.valueOf(m.getReplyId()), ProtoUtils.toRaftGroupId(m.getRaftGroupId()), - p.getRpcRequest().getCallId(), peers); + m.getCallId(), + ProtoUtils.toRaftGroup(p.getGroup())); } public static ReinitializeRequestProto toReinitializeRequestProto( ReinitializeRequest request) { return ReinitializeRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) - .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers())) + .setGroup(ProtoUtils.toRaftGroupProtoBuilder(request.getGroup())) .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index c7ad935..69483ef 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -19,6 +19,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.shaded.com.google.common.base.Predicates; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.TimeDuration; @@ -29,9 +30,11 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { @@ -89,23 +92,23 @@ final class RaftClientImpl implements RaftClient { throws IOException { final long callId = nextCallId(); // also refresh the rpc proxies for these peers - addServers(peersInNewConf); + addServers(Arrays.stream(peersInNewConf)); return sendRequestWithRetry(() -> new SetConfigurationRequest( clientId, leaderId, groupId, callId, peersInNewConf)); } @Override - public RaftClientReply reinitialize(RaftPeer[] peersInNewConf, RaftPeerId server) + public RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) throws IOException { final long callId = nextCallId(); - addServers(peersInNewConf); + addServers(newGroup.getPeers().stream()); return sendRequest(new ReinitializeRequest( - clientId, server, groupId, callId, peersInNewConf)); + clientId, server, groupId, callId, newGroup)); } - private void addServers(RaftPeer[] peersInNewConf) { - clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains) - .collect(Collectors.toList())); + private void addServers(Stream<RaftPeer> peersInNewConf) { + clientRpc.addServers( + peersInNewConf.filter(p -> !peers.contains(p))::iterator); } private RaftClientReply sendRequestWithRetry( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java index 6e870b1..d00bed5 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java @@ -34,10 +34,18 @@ public class RaftGroup { /** The group of raft peers */ private final List<RaftPeer> peers; + public RaftGroup(RaftGroupId groupId) { + this(groupId, Collections.emptyList()); + } + public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) { + this(groupId, Arrays.asList(peers)); + } + + public RaftGroup(RaftGroupId groupId, List<RaftPeer> peers) { Preconditions.assertTrue(peers != null); this.groupId = groupId; - this.peers = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(peers))); + this.peers = Collections.unmodifiableList(new ArrayList<>(peers)); } public RaftGroupId getGroupId() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java index b69c845..3c6d468 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java @@ -17,23 +17,21 @@ */ package org.apache.ratis.protocol; -import java.util.Arrays; - public class ReinitializeRequest extends RaftClientRequest { private final RaftGroup group; public ReinitializeRequest(ClientId clientId, RaftPeerId serverId, - RaftGroupId groupId, long callId, RaftPeer[] peers) { + RaftGroupId groupId, long callId, RaftGroup group) { super(clientId, serverId, groupId, callId, null); - this.group = new RaftGroup(groupId, peers); + this.group = group; } - public RaftGroup getPeersInGroup() { + public RaftGroup getGroup() { return group; } @Override public String toString() { - return super.toString() + ", peers:" + Arrays.asList(getPeersInGroup()); + return super.toString() + ", " + getGroup(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java index 86dd865..9d97f4d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java @@ -123,6 +123,10 @@ public interface NetUtils { } static String address2String(InetSocketAddress address) { - return address.getHostName() + ":" + address.getPort(); + final StringBuilder b = new StringBuilder(address.getHostName()); + if (address.getAddress() instanceof Inet6Address) { + b.insert(0, '[').append(']'); + } + return b.append(':').append(address.getPort()).toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 9a2d530..7e849fd 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -17,10 +17,7 @@ */ package org.apache.ratis.util; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ServiceException; import org.apache.ratis.shaded.proto.RaftProtos.*; @@ -111,6 +108,16 @@ public class ProtoUtils { return RaftGroupIdProto.newBuilder().setId(id.toByteString()); } + public static RaftGroup toRaftGroup(RaftGroupProto proto) { + return new RaftGroup(toRaftGroupId(proto.getGroupId()), + toRaftPeerArray(proto.getPeersList())); + } + + public static RaftGroupProto.Builder toRaftGroupProtoBuilder(RaftGroup group) { + return RaftGroupProto.newBuilder() + .setGroupId(toRaftGroupIdProtoBuilder(group.getGroupId())) + .addAllPeers(toRaftPeerProtos(group.getPeers())); + } public static boolean isConfigurationLogEntry(LogEntryProto entry) { return entry.getLogEntryBodyCase() == http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index ed145df..4dada3b 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -195,5 +195,5 @@ message SetConfigurationRequestProto { // reinitialize request message ReinitializeRequestProto { RaftRpcRequestProto rpcRequest = 1; - repeated RaftPeerProto peers = 2; + RaftGroupProto group = 2; // the target group. } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index d5a46f9..622e75a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -88,7 +88,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, } /** Set all the peers (including the server being built) in the Raft cluster. */ - public Builder setPeers(RaftGroup group) { + public Builder setGroup(RaftGroup group) { this.group = group; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 4b3b9f5..9be1e9a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -169,7 +169,7 @@ public class RaftServerProxy implements RaftServer { final RaftServerImpl newImpl; try { - newImpl = initImpl(request.getPeersInGroup()); + newImpl = initImpl(request.getGroup()); } catch (IOException ioe) { final RaftException re = new RaftException( "Failed to reinitialize, request=" + request, ioe); @@ -178,7 +178,7 @@ public class RaftServerProxy implements RaftServer { return new RaftClientReply(request, re); } - getServerRpc().addPeers(request.getPeersInGroup().getPeers()); + getServerRpc().addPeers(request.getGroup().getPeers()); newImpl.start(); impl.complete(newImpl); return new RaftClientReply(request, (Message) null); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java index 20b8680..a4ac287 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java @@ -66,7 +66,7 @@ public abstract class ReinitializationBaseTest { // Start server with an empty conf final RaftGroupId groupId = RaftGroupId.createId(); - final RaftGroup group = new RaftGroup(groupId, RaftPeer.EMPTY_PEERS); + final RaftGroup group = new RaftGroup(groupId); final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0)) .map(RaftPeerId::valueOf).collect(Collectors.toList()); @@ -81,11 +81,10 @@ public abstract class ReinitializationBaseTest { Assert.assertNull(cluster.getLeader()); // Reinitialize servers - final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS); - for(RaftPeer p : peers) { - final RaftClient client = cluster.createClient(p.getId(), - new RaftGroup(groupId, new RaftPeer[]{p})); - client.reinitialize(peers, p.getId()); + final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers()); + final RaftClient client = cluster.createClient(null, newGroup); + for(RaftPeer p : newGroup.getPeers()) { + client.reinitialize(newGroup, p.getId()); } Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); cluster.shutdown(); @@ -106,7 +105,7 @@ public abstract class ReinitializationBaseTest { @Test public void testReinitialize9Nodes() throws Exception { final int[] idIndex = {5, 8, 9}; - runTestReinitializeMultiGroups(idIndex, 0); + runTestReinitializeMultiGroups(idIndex, 2); } private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception { @@ -121,7 +120,7 @@ public abstract class ReinitializationBaseTest { LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers()); // Start server with an empty conf - final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId(), RaftPeer.EMPTY_PEERS); + final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId()); final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0)) .map(RaftPeerId::valueOf).collect(Collectors.toList()); @@ -147,8 +146,8 @@ public abstract class ReinitializationBaseTest { LOG.info(i + ") starting " + groups[i]); for(RaftPeer p : peers) { - try(final RaftClient client = cluster.createClient(p.getId(), groups[i])) { - client.reinitialize(peers, p.getId()); + try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) { + client.reinitialize(groups[i], p.getId()); } } Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid)); @@ -162,10 +161,11 @@ public abstract class ReinitializationBaseTest { for (int i = 0; i < groups.length; i++) { if (i != chosen) { final RaftGroup g = groups[i]; + final RaftGroup newGroup = new RaftGroup(g.getGroupId()); LOG.info(i + ") close " + cluster.printServers(g.getGroupId())); for(RaftPeer p : g.getPeers()) { try (final RaftClient client = cluster.createClient(p.getId(), g)) { - client.reinitialize(RaftPeer.EMPTY_PEERS, p.getId()); + client.reinitialize(newGroup, p.getId()); } } } @@ -174,6 +174,7 @@ public abstract class ReinitializationBaseTest { LOG.info("close groups: " + cluster.printServers()); // update chosen group to use all the peers + final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId()); final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS); for(int i = 0; i < groups.length; i++) { LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId())); @@ -184,7 +185,7 @@ public abstract class ReinitializationBaseTest { } else { for(RaftPeer p : groups[i].getPeers()) { try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) { - client.reinitialize(array, p.getId()); + client.reinitialize(newGroup, p.getId()); } } }