[ACCUMULO-4506] Add a timeout to a replication RPC call This addresses an issue where a replication task will get stuck for a substantial amount of time. Using a timeout for the client will abandon the task after 2m (default) for a single RPC attempt. Note that this is related to the replication.work.attempts property, as a client timeout will be retried by the same task up to this number of times before being abandoned entirely.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f6882fc Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f6882fc Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f6882fc Branch: refs/heads/1.8 Commit: 5f6882fc28bfea9cd48afef174dcf957eefde210 Parents: 771475a Author: Adam J. Shook <adamjsh...@gmail.com> Authored: Wed May 3 13:19:07 2017 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed May 3 16:05:21 2017 -0400 ---------------------------------------------------------------------- .../core/client/impl/ReplicationClient.java | 10 ++++++---- .../org/apache/accumulo/core/conf/Property.java | 2 ++ .../org/apache/accumulo/core/rpc/ThriftUtil.java | 2 +- .../replication/AccumuloReplicaSystem.java | 19 ++++++++++++------- 4 files changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index cc9b5c1..8265503 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -119,14 +119,16 @@ public class ReplicationClient { * The client session for the peer replicant * @param server * Server to connect to + * @param timeout + * RPC timeout in milliseconds * @return A ReplicationServicer client to the given host in the given instance */ - public static ReplicationServicer.Client getServicerConnection(ClientContext context, HostAndPort server) throws TTransportException { + public static ReplicationServicer.Client getServicerConnection(ClientContext context, HostAndPort server, long timeout) throws TTransportException { requireNonNull(context); requireNonNull(server); try { - return ThriftUtil.getClientNoTimeout(new ReplicationServicer.Client.Factory(), server, context); + return ThriftUtil.getClient(new ReplicationServicer.Client.Factory(), server, context, timeout); } catch (TTransportException tte) { log.debug("Failed to connect to servicer ({}), will retry...", server, tte); throw tte; @@ -180,12 +182,12 @@ public class ReplicationClient { throw new AccumuloException("Could not connect to ReplicationCoordinator at " + context.getInstance().getInstanceName()); } - public static <T> T executeServicerWithReturn(ClientContext context, HostAndPort tserver, ClientExecReturn<T,ReplicationServicer.Client> exec) + public static <T> T executeServicerWithReturn(ClientContext context, HostAndPort tserver, ClientExecReturn<T,ReplicationServicer.Client> exec, long timeout) throws AccumuloException, AccumuloSecurityException, TTransportException { ReplicationServicer.Client client = null; while (true) { try { - client = getServicerConnection(context, tserver); + client = getServicerConnection(context, tserver, timeout); return exec.execute(client); } catch (ThriftSecurityException e) { throw new AccumuloSecurityException(e.user, e.code, e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index db9d6a6..0e7026f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -584,6 +584,8 @@ public enum Property { REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work, not useful outside of tests"), REPLICATION_TRACE_PERCENT("replication.trace.percent", "0.1", PropertyType.FRACTION, "The sampling percentage to use for replication traces"), + REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION, + "Amount of time for a single replication RPC call to last before failing the attempt. See replication.work.attempts."), ; http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 49e4349..deee3fe 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -134,7 +134,7 @@ public class ThriftUtil { * @param timeout * Socket timeout which overrides the ClientContext timeout */ - private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context, long timeout) + public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context, long timeout) throws TTransportException { TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context); return createClient(factory, transport); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 411ae87..9709e50 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -163,6 +163,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { final Instance localInstance = HdfsZooInstance.getInstance(); final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration(); + log.debug("Replication RPC timeout is {}", localConf.get(Property.REPLICATION_RPC_TIMEOUT.getKey())); + final String principal = getPrincipal(localConf, target); final File keytab; final String password; @@ -275,6 +277,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { final HostAndPort peerTserver = HostAndPort.fromString(peerTserverStr); + final long timeout = localConf.getTimeInMillis(Property.REPLICATION_RPC_TIMEOUT); + // We have a tserver on the remote -- send the data its way. Status finalStatus; final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE); @@ -282,14 +286,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem { if (p.getName().endsWith(RFILE_SUFFIX)) { span = Trace.start("RFile replication"); try { - finalStatus = replicateRFiles(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper); + finalStatus = replicateRFiles(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, timeout); } finally { span.stop(); } } else { span = Trace.start("WAL replication"); try { - finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi); + finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi, + timeout); } finally { span.stop(); } @@ -314,7 +319,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, - final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, + final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException { DataInputStream input; try { @@ -328,7 +333,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { while (true) { // Read and send a batch of mutations ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new RFileClientExecReturn(target, input, p, - currentStatus, sizeLimit, remoteTableId, tcreds)); + currentStatus, sizeLimit, remoteTableId, tcreds), timeout); // Catch the overflow long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; @@ -360,8 +365,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, - final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi) - throws TTransportException, AccumuloException, AccumuloSecurityException { + final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi, + long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException { log.debug("Replication WAL to peer tserver"); final Set<Integer> tids; @@ -428,7 +433,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { try { // Read and send a batch of mutations replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, - remoteTableId, tcreds, tids)); + remoteTableId, tcreds, tids), timeout); } catch (Exception e) { log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); throw e;