[ACCUMULO-4506] Add a timeout to a replication RPC call

This addresses an issue where a replication task will get stuck for a
substantial amount of time.  Using a timeout for the client will abandon
the task after 2m (default) for a single RPC attempt.  Note that this is
related to the replication.work.attempts property, as a client timeout
will be retried by the same task up to this number of times before being
abandoned entirely.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f6882fc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f6882fc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f6882fc

Branch: refs/heads/1.8
Commit: 5f6882fc28bfea9cd48afef174dcf957eefde210
Parents: 771475a
Author: Adam J. Shook <adamjsh...@gmail.com>
Authored: Wed May 3 13:19:07 2017 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed May 3 16:05:21 2017 -0400

----------------------------------------------------------------------
 .../core/client/impl/ReplicationClient.java      | 10 ++++++----
 .../org/apache/accumulo/core/conf/Property.java  |  2 ++
 .../org/apache/accumulo/core/rpc/ThriftUtil.java |  2 +-
 .../replication/AccumuloReplicaSystem.java       | 19 ++++++++++++-------
 4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index cc9b5c1..8265503 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -119,14 +119,16 @@ public class ReplicationClient {
    *          The client session for the peer replicant
    * @param server
    *          Server to connect to
+   * @param timeout
+   *          RPC timeout in milliseconds
    * @return A ReplicationServicer client to the given host in the given 
instance
    */
-  public static ReplicationServicer.Client getServicerConnection(ClientContext 
context, HostAndPort server) throws TTransportException {
+  public static ReplicationServicer.Client getServicerConnection(ClientContext 
context, HostAndPort server, long timeout) throws TTransportException {
     requireNonNull(context);
     requireNonNull(server);
 
     try {
-      return ThriftUtil.getClientNoTimeout(new 
ReplicationServicer.Client.Factory(), server, context);
+      return ThriftUtil.getClient(new ReplicationServicer.Client.Factory(), 
server, context, timeout);
     } catch (TTransportException tte) {
       log.debug("Failed to connect to servicer ({}), will retry...", server, 
tte);
       throw tte;
@@ -180,12 +182,12 @@ public class ReplicationClient {
     throw new AccumuloException("Could not connect to ReplicationCoordinator 
at " + context.getInstance().getInstanceName());
   }
 
-  public static <T> T executeServicerWithReturn(ClientContext context, 
HostAndPort tserver, ClientExecReturn<T,ReplicationServicer.Client> exec)
+  public static <T> T executeServicerWithReturn(ClientContext context, 
HostAndPort tserver, ClientExecReturn<T,ReplicationServicer.Client> exec, long 
timeout)
       throws AccumuloException, AccumuloSecurityException, TTransportException 
{
     ReplicationServicer.Client client = null;
     while (true) {
       try {
-        client = getServicerConnection(context, tserver);
+        client = getServicerConnection(context, tserver, timeout);
         return exec.execute(client);
       } catch (ThriftSecurityException e) {
         throw new AccumuloSecurityException(e.user, e.code, e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index db9d6a6..0e7026f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -584,6 +584,8 @@ public enum Property {
   REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", 
PropertyType.TIMEDURATION,
       "Amount of time to wait before re-checking for replication work, not 
useful outside of tests"),
   REPLICATION_TRACE_PERCENT("replication.trace.percent", "0.1", 
PropertyType.FRACTION, "The sampling percentage to use for replication traces"),
+  REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", 
PropertyType.TIMEDURATION,
+      "Amount of time for a single replication RPC call to last before failing 
the attempt. See replication.work.attempts."),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 49e4349..deee3fe 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -134,7 +134,7 @@ public class ThriftUtil {
    * @param timeout
    *          Socket timeout which overrides the ClientContext timeout
    */
-  private static <T extends TServiceClient> T 
getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext 
context, long timeout)
+  public static <T extends TServiceClient> T 
getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext 
context, long timeout)
       throws TTransportException {
     TTransport transport = 
ThriftTransportPool.getInstance().getTransport(address, timeout, context);
     return createClient(factory, transport);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 411ae87..9709e50 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -163,6 +163,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
     final Instance localInstance = HdfsZooInstance.getInstance();
     final AccumuloConfiguration localConf = new 
ServerConfigurationFactory(localInstance).getConfiguration();
 
+    log.debug("Replication RPC timeout is {}", 
localConf.get(Property.REPLICATION_RPC_TIMEOUT.getKey()));
+
     final String principal = getPrincipal(localConf, target);
     final File keytab;
     final String password;
@@ -275,6 +277,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
 
         final HostAndPort peerTserver = HostAndPort.fromString(peerTserverStr);
 
+        final long timeout = 
localConf.getTimeInMillis(Property.REPLICATION_RPC_TIMEOUT);
+
         // We have a tserver on the remote -- send the data its way.
         Status finalStatus;
         final long sizeLimit = 
conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
@@ -282,14 +286,15 @@ public class AccumuloReplicaSystem implements 
ReplicaSystem {
           if (p.getName().endsWith(RFILE_SUFFIX)) {
             span = Trace.start("RFile replication");
             try {
-              finalStatus = replicateRFiles(peerContext, peerTserver, target, 
p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper);
+              finalStatus = replicateRFiles(peerContext, peerTserver, target, 
p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, timeout);
             } finally {
               span.stop();
             }
           } else {
             span = Trace.start("WAL replication");
             try {
-              finalStatus = replicateLogs(peerContext, peerTserver, target, p, 
status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
+              finalStatus = replicateLogs(peerContext, peerTserver, target, p, 
status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi,
+                  timeout);
             } finally {
               span.stop();
             }
@@ -314,7 +319,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
   }
 
   protected Status replicateRFiles(ClientContext peerContext, final 
HostAndPort peerTserver, final ReplicationTarget target, final Path p, final 
Status status,
-      final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper) throws TTransportException,
+      final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, long timeout) throws 
TTransportException,
       AccumuloException, AccumuloSecurityException {
     DataInputStream input;
     try {
@@ -328,7 +333,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
     while (true) {
       // Read and send a batch of mutations
       ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
-          currentStatus, sizeLimit, remoteTableId, tcreds));
+          currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
       // Catch the overflow
       long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
@@ -360,8 +365,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
   }
 
   protected Status replicateLogs(ClientContext peerContext, final HostAndPort 
peerTserver, final ReplicationTarget target, final Path p, final Status status,
-      final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, final UserGroupInformation 
accumuloUgi)
-      throws TTransportException, AccumuloException, AccumuloSecurityException 
{
+      final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, final UserGroupInformation 
accumuloUgi,
+      long timeout) throws TTransportException, AccumuloException, 
AccumuloSecurityException {
 
     log.debug("Replication WAL to peer tserver");
     final Set<Integer> tids;
@@ -428,7 +433,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
       try {
         // Read and send a batch of mutations
         replResult = ReplicationClient.executeServicerWithReturn(peerContext, 
peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids));
+            remoteTableId, tcreds, tids), timeout);
       } catch (Exception e) {
         log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
         throw e;

Reply via email to