Repository: incubator-ratis Updated Branches: refs/heads/master e37ab2ee1 -> 6c97d0603
RATIS-219. Add configuration for timeout duration. Contributed by Lokesh Jain Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/6c97d060 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/6c97d060 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/6c97d060 Branch: refs/heads/master Commit: 6c97d0603ae8c12776b87b419ac5e66adafcccc5 Parents: e37ab2e Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Fri Apr 13 09:45:14 2018 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Fri Apr 13 09:45:14 2018 +0800 ---------------------------------------------------------------------- .../ratis/client/RaftClientConfigKeys.java | 23 ++++++++------ .../ratis/client/impl/RaftClientImpl.java | 2 +- .../org/apache/ratis/grpc/GrpcConfigKeys.java | 2 +- .../org/apache/ratis/grpc/RaftGRpcService.java | 10 ++++-- .../ratis/grpc/client/AppendStreamer.java | 2 +- .../apache/ratis/grpc/client/GrpcClientRpc.java | 3 +- .../grpc/client/RaftClientProtocolClient.java | 33 +++++++++++++------- .../grpc/client/RaftClientProtocolProxy.java | 9 +++--- .../ratis/grpc/server/GRpcLogAppender.java | 7 +++-- .../grpc/server/RaftServerProtocolClient.java | 17 +++++----- .../ratis/server/RaftServerConfigKeys.java | 10 ++++++ 11 files changed, 73 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index bb76910..cc4e5ec 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -30,12 +30,21 @@ public interface RaftClientConfigKeys { interface Rpc { String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc"; - String TIMEOUT_KEY = PREFIX + ".timeout"; - TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); + String RETRY_INTERVAL_KEY = PREFIX + ".retryInterval"; + TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); + static TimeDuration retryInterval(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()), + RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT); + } - static TimeDuration timeout(RaftProperties properties) { - return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()), - TIMEOUT_KEY, TIMEOUT_DEFAULT); + String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout"; + TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); + static TimeDuration requestTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()), + REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT); + } + static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) { + setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration); } } @@ -44,24 +53,20 @@ public interface RaftClientConfigKeys { String MAX_OUTSTANDING_REQUESTS_KEY = PREFIX + ".outstanding-requests.max"; int MAX_OUTSTANDING_REQUESTS_DEFAULT = 100; - static int maxOutstandingRequests(RaftProperties properties) { return getInt(properties::getInt, MAX_OUTSTANDING_REQUESTS_KEY, MAX_OUTSTANDING_REQUESTS_DEFAULT, requireMin(2)); } - static void setMaxOutstandingRequests(RaftProperties properties, int outstandingRequests) { setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, outstandingRequests); } String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads"; int SCHEDULER_THREADS_DEFAULT = 3; - static int schedulerThreads(RaftProperties properties) { return getInt(properties::getInt, SCHEDULER_THREADS_KEY, SCHEDULER_THREADS_DEFAULT, requireMin(1)); } - static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) { setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index e8a897b..44df5c3 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -108,7 +108,7 @@ final class RaftClientImpl implements RaftClient { this.groupId = group.getGroupId(); this.leaderId = leaderId != null? leaderId : !peers.isEmpty()? peers.iterator().next().getId(): null; - this.retryInterval = RaftClientConfigKeys.Rpc.timeout(properties); + this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties); asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties)); scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index c1cc33a..9d18f3e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -75,7 +75,7 @@ public interface GrpcConfigKeys { } String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval"; - TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT; + TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.RETRY_INTERVAL_DEFAULT; static TimeDuration retryInterval(RaftProperties properties) { return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()), RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index ae7a977..d3827ef 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -34,6 +34,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,7 @@ public class RaftGRpcService implements RaftServerRpc { Collections.synchronizedMap(new HashMap<>()); private final Supplier<RaftPeerId> idSupplier; private final int flowControlWindow; + private final TimeDuration requestTimeoutDuration; private RaftGRpcService(RaftServer server) { this(server, @@ -82,17 +84,19 @@ public class RaftGRpcService implements RaftServerRpc { GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(), GrpcConfigKeys.messageSizeMax(server.getProperties()), RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()), - GrpcConfigKeys.flowControlWindow(server.getProperties())); + GrpcConfigKeys.flowControlWindow(server.getProperties()), + RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties())); } private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize, SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, - SizeInBytes flowControlWindowSize) { + SizeInBytes flowControlWindowSize, TimeDuration requestTimeoutDuration) { if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { throw new IllegalArgumentException("Illegal configuration: " + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); } this.flowControlWindow = flowControlWindowSize.getSizeInt(); + this.requestTimeoutDuration = requestTimeoutDuration; ServerBuilder serverBuilder = ServerBuilder.forPort(port); idSupplier = raftServer::getId; @@ -178,7 +182,7 @@ public class RaftGRpcService implements RaftServerRpc { public void addPeers(Iterable<RaftPeer> newPeers) { for (RaftPeer p : newPeers) { if (!peers.containsKey(p.getId())) { - peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow)); + peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow, requestTimeoutDuration)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index ff3ed28..c1228d0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -98,7 +98,7 @@ public class AppendStreamer implements Closeable { Collectors.toMap(RaftPeer::getId, Function.identity())); proxyMap = new PeerProxyMap<>(clientId.toString(), raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new, - GrpcConfigKeys.flowControlWindow(prop), maxMessageSize)); + prop)); proxyMap.addPeers(group.getPeers()); refreshLeaderProxy(leaderId, null); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 6f1142e..8c26c7f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -48,8 +48,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie private final int maxMessageSize; public GrpcClientRpc(ClientId clientId, RaftProperties properties) { - super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, - GrpcConfigKeys.flowControlWindow(properties), GrpcConfigKeys.messageSizeMax(properties)))); + super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, properties))); this.clientId = clientId; maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index e90dad4..d01bbe8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -17,7 +17,10 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.util.TimeoutScheduler; @@ -56,7 +59,7 @@ public class RaftClientProtocolClient implements Closeable { private final Supplier<String> name; private final RaftPeer target; private final ManagedChannel channel; - private TimeDuration timeout = TimeDuration.valueOf(3, TimeUnit.SECONDS); + private final TimeDuration requestTimeoutDuration; private final RaftClientProtocolServiceBlockingStub blockingStub; private final RaftClientProtocolServiceStub asyncStub; private final AdminProtocolServiceBlockingStub adminBlockingStub; @@ -64,9 +67,11 @@ public class RaftClientProtocolClient implements Closeable { private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>(); public RaftClientProtocolClient(ClientId id, RaftPeer target, - SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) { + RaftProperties properties) { this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; + SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties); + SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties); channel = NettyChannelBuilder.forTarget(target.getAddress()) .usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt()) .maxMessageSize(maxMessageSize.getSizeInt()) @@ -74,6 +79,7 @@ public class RaftClientProtocolClient implements Closeable { blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel); + this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); } String getName() { @@ -91,20 +97,23 @@ public class RaftClientProtocolClient implements Closeable { RaftClientReplyProto reinitialize( ReinitializeRequestProto request) throws IOException { - TimeUnit unit = timeout.getUnit(); - return blockingCall(() -> adminBlockingStub.withDeadlineAfter(timeout.toInt(unit), unit).reinitialize(request)); + return blockingCall(() -> adminBlockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .reinitialize(request)); } ServerInformationReplyProto serverInformation( ServerInformationRequestProto request) throws IOException { - TimeUnit unit = timeout.getUnit(); - return adminBlockingStub.withDeadlineAfter(timeout.toInt(unit), unit).serverInformation(request); + return adminBlockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .serverInformation(request); } RaftClientReplyProto setConfiguration( SetConfigurationRequestProto request) throws IOException { - TimeUnit unit = timeout.getUnit(); - return blockingCall(() -> blockingStub.withDeadlineAfter(timeout.toInt(unit), unit).setConfiguration(request)); + return blockingCall(() -> blockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .setConfiguration(request)); } private static RaftClientReplyProto blockingCall( @@ -124,8 +133,8 @@ public class RaftClientProtocolClient implements Closeable { StreamObserver<RaftClientRequestProto> appendWithTimeout( StreamObserver<RaftClientReplyProto> responseHandler) { - TimeUnit unit = timeout.getUnit(); - return asyncStub.withDeadlineAfter(timeout.toInt(unit), unit).append(responseHandler); + return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .append(responseHandler); } AsyncStreamObservers getAppendStreamObservers() { @@ -179,7 +188,7 @@ public class RaftClientProtocolClient implements Closeable { () -> getName() + ":" + getClass().getSimpleName()); try { requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request)); - TimeoutScheduler.onTimeout(timeout, () -> timeoutCheck(request), LOG, + TimeoutScheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG, () -> "Timeout check failed for client request: " + request); } catch(Throwable t) { handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t)); @@ -189,7 +198,7 @@ public class RaftClientProtocolClient implements Closeable { private void timeoutCheck(RaftClientRequest request) { handleReplyFuture(request.getCallId(), f -> f.completeExceptionally( - new IOException("Request timeout " + timeout + ": " + request))); + new IOException("Request timeout " + requestTimeoutDuration + ": " + request))); } private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java index 2ed70c2..ee9ce4e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java @@ -17,12 +17,12 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.util.SizeInBytes; import java.io.Closeable; import java.io.IOException; @@ -33,11 +33,10 @@ public class RaftClientProtocolProxy implements Closeable { private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation; private RpcSession currentSession; - public RaftClientProtocolProxy( - ClientId clientId, RaftPeer target, + public RaftClientProtocolProxy(ClientId clientId, RaftPeer target, Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation, - SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) { - proxy = new RaftClientProtocolClient(clientId, target, flowControlWindow, maxMessageSize); + RaftProperties properties) { + proxy = new RaftClientProtocolClient(clientId, target, properties); this.responseHandlerCreation = responseHandlerCreation; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 64d0b23..360d020 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGRpcService; import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.TimeoutScheduler; import org.apache.ratis.server.impl.FollowerInfo; import org.apache.ratis.server.impl.LeaderState; @@ -43,7 +44,6 @@ import java.util.Objects; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -58,7 +58,7 @@ public class GRpcLogAppender extends LogAppender { private final AppendLogResponseHandler appendResponseHandler; private final InstallSnapshotResponseHandler snapshotResponseHandler; - private static TimeDuration rpcTimeout = TimeDuration.valueOf(3, TimeUnit.SECONDS); + private static TimeDuration requestTimeoutDuration; private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver; @@ -71,6 +71,7 @@ public class GRpcLogAppender extends LogAppender { client = rpcService.getRpcClient(f.getPeer()); maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax( server.getProxy().getProperties()); + requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties()); pendingRequests = new ConcurrentHashMap<>(); appendResponseHandler = new AppendLogResponseHandler(); @@ -160,7 +161,7 @@ public class GRpcLogAppender extends LogAppender { server.getId(), null, request); s.onNext(request); - TimeoutScheduler.onTimeout(rpcTimeout, () -> timeoutAppendRequest(request), LOG, + TimeoutScheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG, () -> "Timeout check failed for append entry request: " + request); follower.updateLastRpcSendTime(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java index 42a2b85..034f06c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java @@ -27,24 +27,24 @@ import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.util.TimeDuration; -import java.util.concurrent.TimeUnit; - /** * This is a RaftClient implementation that supports streaming data to the raft * ring. The stream implementation utilizes gRPC. */ public class RaftServerProtocolClient { private final ManagedChannel channel; - private TimeDuration timeout = TimeDuration.valueOf(3, TimeUnit.SECONDS); + private final TimeDuration requestTimeoutDuration; private final RaftServerProtocolServiceBlockingStub blockingStub; private final RaftServerProtocolServiceStub asyncStub; - public RaftServerProtocolClient(RaftPeer target, int flowControlWindow) { + public RaftServerProtocolClient(RaftPeer target, int flowControlWindow, + TimeDuration requestTimeoutDuration) { channel = NettyChannelBuilder.forTarget(target.getAddress()) .usePlaintext(true).flowControlWindow(flowControlWindow) .build(); blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); + this.requestTimeoutDuration = requestTimeoutDuration; } public void shutdown() { @@ -53,8 +53,9 @@ public class RaftServerProtocolClient { public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) { // the StatusRuntimeException will be handled by the caller - TimeUnit unit = timeout.getUnit(); - RequestVoteReplyProto r= blockingStub.withDeadlineAfter(timeout.toInt(unit), unit).requestVote(request); + RequestVoteReplyProto r = + blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .requestVote(request); return r; } @@ -65,7 +66,7 @@ public class RaftServerProtocolClient { StreamObserver<InstallSnapshotRequestProto> installSnapshot( StreamObserver<InstallSnapshotReplyProto> responseHandler) { - TimeUnit unit = timeout.getUnit(); - return asyncStub.withDeadlineAfter(timeout.toInt(unit), unit).installSnapshot(responseHandler); + return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .installSnapshot(responseHandler); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index cf40580..a18f9f1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -204,6 +204,16 @@ public interface RaftServerConfigKeys { setTimeDuration(properties::setTimeDuration, TIMEOUT_MAX_KEY, maxDuration); } + String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout"; + TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); + static TimeDuration requestTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()), + REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT); + } + static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) { + setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration); + } + String SLEEP_TIME_KEY = PREFIX + ".sleep.time"; TimeDuration SLEEP_TIME_DEFAULT = TimeDuration.valueOf(25, TimeUnit.MILLISECONDS); static TimeDuration sleepTime(RaftProperties properties) {