This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 82c354eea5c HBASE-28085 Configurably use scanner timeout as rpc 
timeout for scanner next calls (#5402)
82c354eea5c is described below

commit 82c354eea5c336ed4ec2e86532089ae48c1f517a
Author: Bryan Beaudreault <bbeaudrea...@apache.org>
AuthorDate: Wed Sep 20 17:17:51 2023 -0400

    HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner 
next calls (#5402)
    
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
    Signed-off-by: Duo Zhang <zhang...@apache.org>
---
 .../hbase/client/ClientAsyncPrefetchScanner.java   |   6 +-
 .../apache/hadoop/hbase/client/ClientScanner.java  |  17 +-
 .../hadoop/hbase/client/ClientSimpleScanner.java   |   6 +-
 .../hbase/client/ConnectionConfiguration.java      |  16 ++
 .../hbase/client/ConnectionImplementation.java     |   2 +-
 .../org/apache/hadoop/hbase/client/HTable.java     |   6 +-
 .../hadoop/hbase/client/ReversedClientScanner.java |   6 +-
 .../hbase/client/ScannerCallableWithReplicas.java  |  49 +++++-
 .../hadoop/hbase/client/TestClientScanner.java     |  18 +-
 .../hbase/client/TestClientScannerTimeouts.java    | 183 ++++++++++++++++++---
 10 files changed, 247 insertions(+), 62 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index 769931b7083..abd1267ffc4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -66,9 +66,11 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
     ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
     RpcControllerFactory rpcControllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
     int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
-    Map<String, byte[]> requestAttributes) throws IOException {
+    ConnectionConfiguration connectionConfiguration, Map<String, byte[]> 
requestAttributes)
+    throws IOException {
     super(configuration, scan, name, connection, rpcCallerFactory, 
rpcControllerFactory, pool,
-      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, 
requestAttributes);
+      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
+      connectionConfiguration, requestAttributes);
     exceptionsQueue = new ConcurrentLinkedQueue<>();
     final Context context = Context.current();
     final Runnable runnable = context.wrap(new PrefetchRunnable());
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 33cfedc362a..ef8e4b0404f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
@@ -78,6 +77,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
   protected final TableName tableName;
   protected final int readRpcTimeout;
   protected final int scannerTimeout;
+  private final boolean useScannerTimeoutForNextCalls;
   protected boolean scanMetricsPublished = false;
   protected RpcRetryingCaller<Result[]> caller;
   protected RpcControllerFactory rpcControllerFactory;
@@ -104,7 +104,8 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
   public ClientScanner(final Configuration conf, final Scan scan, final 
TableName tableName,
     ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
     RpcControllerFactory controllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
-    int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> 
requestAttributes)
+    int scannerTimeout, int primaryOperationTimeout,
+    ConnectionConfiguration connectionConfiguration, Map<String, byte[]> 
requestAttributes)
     throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace(
@@ -116,16 +117,15 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     this.connection = connection;
     this.pool = pool;
     this.primaryOperationTimeout = primaryOperationTimeout;
-    this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    this.retries = connectionConfiguration.getRetriesNumber();
     if (scan.getMaxResultSize() > 0) {
       this.maxScannerResultSize = scan.getMaxResultSize();
     } else {
-      this.maxScannerResultSize = 
conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
-        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+      this.maxScannerResultSize = 
connectionConfiguration.getScannerMaxResultSize();
     }
     this.readRpcTimeout = scanReadRpcTimeout;
     this.scannerTimeout = scannerTimeout;
+    this.useScannerTimeoutForNextCalls = 
connectionConfiguration.isUseScannerTimeoutForNextCalls();
     this.requestAttributes = requestAttributes;
 
     // check if application wants to collect scan metrics
@@ -135,8 +135,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     if (this.scan.getCaching() > 0) {
       this.caching = this.scan.getCaching();
     } else {
-      this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
-        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+      this.caching = connectionConfiguration.getScannerCaching();
     }
 
     this.caller = rpcFactory.<Result[]> newCaller();
@@ -255,7 +254,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     this.currentRegion = null;
     this.callable = new ScannerCallableWithReplicas(getTable(), 
getConnection(),
       createScannerCallable(), pool, primaryOperationTimeout, scan, 
getRetries(), readRpcTimeout,
-      scannerTimeout, caching, conf, caller);
+      scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
     this.callable.setCaching(this.caching);
     incRegionCountMetrics(scanMetrics);
     return true;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index 81091ad3010..bde036f8880 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -39,9 +39,11 @@ public class ClientSimpleScanner extends ClientScanner {
     ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
     RpcControllerFactory rpcControllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
     int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
-    Map<String, byte[]> requestAttributes) throws IOException {
+    ConnectionConfiguration connectionConfiguration, Map<String, byte[]> 
requestAttributes)
+    throws IOException {
     super(configuration, scan, name, connection, rpcCallerFactory, 
rpcControllerFactory, pool,
-      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, 
requestAttributes);
+      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
+      connectionConfiguration, requestAttributes);
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 93fa2600d89..2a6651b5dde 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -76,6 +76,12 @@ public class ConnectionConfiguration {
   public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
     "hbase.client.meta.scanner.timeout.period";
 
+  public static final String 
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS =
+    "hbase.client.use.scanner.timeout.period.for.next.calls";
+
+  public static final boolean 
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT =
+    false;
+
   private final long writeBufferSize;
   private final long writeBufferPeriodicFlushTimeoutMs;
   private final long writeBufferPeriodicFlushTimerTickMs;
@@ -99,6 +105,7 @@ public class ConnectionConfiguration {
   private final boolean clientScannerAsyncPrefetch;
   private final long pauseMs;
   private final long pauseMsForServerOverloaded;
+  private final boolean useScannerTimeoutForNextCalls;
 
   /**
    * Constructor
@@ -158,6 +165,9 @@ public class ConnectionConfiguration {
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
 
     this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, 
scanTimeout);
+    this.useScannerTimeoutForNextCalls =
+      conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
+        HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT);
 
     long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, 
DEFAULT_HBASE_CLIENT_PAUSE);
     long pauseMsForServerOverloaded = 
conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
@@ -201,6 +211,8 @@ public class ConnectionConfiguration {
     this.metaScanTimeout = scanTimeout;
     this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
     this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
+    this.useScannerTimeoutForNextCalls =
+      HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT;
   }
 
   public int getReadRpcTimeout() {
@@ -275,6 +287,10 @@ public class ConnectionConfiguration {
     return scanTimeout;
   }
 
+  public boolean isUseScannerTimeoutForNextCalls() {
+    return useScannerTimeoutForNextCalls;
+  }
+
   public int getMetaScanTimeout() {
     return metaScanTimeout;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ff7418e39cd..70d5760df48 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1052,7 +1052,7 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
           ReversedClientScanner rcs = new ReversedClientScanner(conf, s, 
TableName.META_TABLE_NAME,
             this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
             connectionConfig.getMetaReadRpcTimeout(), 
connectionConfig.getMetaScanTimeout(),
-            metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) {
+            metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, 
Collections.emptyMap())) {
           boolean tableNotFound = true;
           for (;;) {
             Result regionInfoRow = rcs.next();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index cc24d80f5ed..386a7db3526 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -321,16 +321,16 @@ public class HTable implements Table {
       if (scan.isReversed()) {
         return new ReversedClientScanner(getConfiguration(), scan, getName(), 
connection,
           rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 
scanTimeout,
-          replicaTimeout, requestAttributes);
+          replicaTimeout, connConfiguration, requestAttributes);
       } else {
         if (async) {
           return new ClientAsyncPrefetchScanner(getConfiguration(), scan, 
getName(), connection,
             rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 
scanTimeout,
-            replicaTimeout, requestAttributes);
+            replicaTimeout, connConfiguration, requestAttributes);
         } else {
           return new ClientSimpleScanner(getConfiguration(), scan, getName(), 
connection,
             rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 
scanTimeout,
-            replicaTimeout, requestAttributes);
+            replicaTimeout, connConfiguration, requestAttributes);
         }
       }
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index 68a8e7b7406..36bbdb5b60e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner {
   public ReversedClientScanner(Configuration conf, Scan scan, TableName 
tableName,
     ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
     RpcControllerFactory controllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
-    int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> 
requestAttributes)
+    int scannerTimeout, int primaryOperationTimeout,
+    ConnectionConfiguration connectionConfiguration, Map<String, byte[]> 
requestAttributes)
     throws IOException {
     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, 
pool,
-      scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, 
requestAttributes);
+      scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, 
connectionConfiguration,
+      requestAttributes);
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 227ad849c84..5261ff4af5c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
   AtomicBoolean replicaSwitched = new AtomicBoolean(false);
   private final ClusterConnection cConnection;
   protected final ExecutorService pool;
+  private final boolean useScannerTimeoutForNextCalls;
   protected final int timeBeforeReplicas;
   private final Scan scan;
   private final int retries;
@@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
 
   public ScannerCallableWithReplicas(TableName tableName, ClusterConnection 
cConnection,
     ScannerCallable baseCallable, ExecutorService pool, int 
timeBeforeReplicas, Scan scan,
-    int retries, int readRpcTimeout, int scannerTimeout, int caching, 
Configuration conf,
-    RpcRetryingCaller<Result[]> caller) {
+    int retries, int readRpcTimeout, int scannerTimeout, boolean 
useScannerTimeoutForNextCalls,
+    int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
     this.currentScannerCallable = baseCallable;
     this.cConnection = cConnection;
     this.pool = pool;
+    this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
     if (timeBeforeReplicas < 0) {
       throw new IllegalArgumentException("Invalid value of operation timeout 
on the primary");
     }
@@ -187,9 +189,12 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
         pool, regionReplication * 5);
 
     AtomicBoolean done = new AtomicBoolean(false);
+    // make sure we use the same rpcTimeout for current and other replicas
+    int rpcTimeoutForCall = getRpcTimeout();
+
     replicaSwitched.set(false);
     // submit call for the primary replica or user specified replica
-    addCallsForCurrentReplica(cs);
+    addCallsForCurrentReplica(cs, rpcTimeoutForCall);
     int startIndex = 0;
 
     try {
@@ -234,7 +239,7 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
       endIndex = 1;
     } else {
       // TODO: this may be an overkill for large region replication
-      addCallsForOtherReplicas(cs, 0, regionReplication - 1);
+      addCallsForOtherReplicas(cs, 0, regionReplication - 1, 
rpcTimeoutForCall);
     }
 
     try {
@@ -326,15 +331,41 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
     return currentScannerCallable != null ? currentScannerCallable.getCursor() 
: null;
   }
 
-  private void
-    addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], 
ScannerCallable>> cs) {
+  private void addCallsForCurrentReplica(
+    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int 
rpcTimeout) {
     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
     outstandingCallables.add(currentScannerCallable);
-    cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, 
currentScannerCallable.id);
+    cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, 
currentScannerCallable.id);
+  }
+
+  /**
+   * As we have a call sequence for scan, it is useless to have a different 
rpc timeout which is
+   * less than the scan timeout. If the server does not respond in 
time(usually this will not happen
+   * as we have heartbeat now), we will get an OutOfOrderScannerNextException 
when resending the
+   * next request and the only way to fix this is to close the scanner and 
open a new one.
+   * <p>
+   * The legacy behavior of ScannerCallable has been to use readRpcTimeout 
despite the above. If
+   * using legacy behavior, we always use that.
+   * <p>
+   * If new behavior is enabled, we determine the rpc timeout to use based on 
whether the scanner is
+   * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
+   */
+  private int getRpcTimeout() {
+    if (useScannerTimeoutForNextCalls) {
+      return isNextCall() ? scannerTimeout : readRpcTimeout;
+    } else {
+      return readRpcTimeout;
+    }
+  }
+
+  private boolean isNextCall() {
+    return currentScannerCallable != null && currentScannerCallable.scannerId 
!= -1
+      && !currentScannerCallable.renew && !currentScannerCallable.closed;
   }
 
   private void addCallsForOtherReplicas(
-    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int 
min, int max) {
+    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int 
min, int max,
+    int rpcTimeout) {
 
     for (int id = min; id <= max; id++) {
       if (currentScannerCallable.id == id) {
@@ -344,7 +375,7 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
       setStartRowForReplicaCallable(s);
       outstandingCallables.add(s);
       RetryingRPC retryingOnReplica = new RetryingRPC(s);
-      cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
+      cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
     }
   }
 
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 0025a4fdbdb..9b5eb91bbd5 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -108,12 +108,12 @@ public class TestClientScanner {
 
     public MockClientScanner(final Configuration conf, final Scan scan, final 
TableName tableName,
       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout)
-      throws IOException {
+      RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout,
+      ConnectionConfiguration connectionConfig) throws IOException {
       super(conf, scan, tableName, connection, rpcFactory, controllerFactory, 
pool,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
primaryOperationTimeout,
-        Collections.emptyMap());
+        connectionConfig, Collections.emptyMap());
     }
 
     @Override
@@ -178,7 +178,7 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner =
       new MockClientScanner(conf, scan, 
TableName.valueOf(name.getMethodName()), clusterConn,
-        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, 
connectionConfig)) {
 
       scanner.setRpcFinished(true);
 
@@ -242,7 +242,7 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner =
       new MockClientScanner(conf, scan, 
TableName.valueOf(name.getMethodName()), clusterConn,
-        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, 
connectionConfig)) {
       InOrder inOrder = Mockito.inOrder(caller);
 
       scanner.loadCache();
@@ -305,7 +305,7 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner =
       new MockClientScanner(conf, scan, 
TableName.valueOf(name.getMethodName()), clusterConn,
-        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, 
connectionConfig)) {
       InOrder inOrder = Mockito.inOrder(caller);
 
       scanner.loadCache();
@@ -376,7 +376,7 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner =
       new MockClientScanner(conf, scan, 
TableName.valueOf(name.getMethodName()), clusterConn,
-        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, 
connectionConfig)) {
       scanner.setRpcFinished(true);
 
       InOrder inOrder = Mockito.inOrder(caller);
@@ -443,7 +443,7 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner =
       new MockClientScanner(conf, scan, 
TableName.valueOf(name.getMethodName()), clusterConn,
-        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, 
connectionConfig)) {
       InOrder inOrder = Mockito.inOrder(caller);
       scanner.setRpcFinished(true);
 
@@ -488,7 +488,7 @@ public class TestClientScanner {
 
     try (MockClientScanner scanner =
       new MockClientScanner(conf, scan, 
TableName.valueOf(name.getMethodName()), clusterConn,
-        rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
+        rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, 
connectionConfig)) {
       Iterator<Result> iter = scanner.iterator();
       while (iter.hasNext()) {
         iter.next();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
index 2bff2297ff5..259bbc4ad9b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
@@ -19,16 +19,20 @@ package org.apache.hadoop.hbase.client;
 
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
@@ -41,13 +45,17 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +65,7 @@ import 
org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
+@RunWith(Parameterized.class)
 @Category({ MediumTests.class, ClientTests.class })
 public class TestClientScannerTimeouts {
 
@@ -67,8 +76,8 @@ public class TestClientScannerTimeouts {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestClientScannerTimeouts.class);
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
 
-  private static AsyncConnection ASYNC_CONN;
-  private static Connection CONN;
+  private AsyncConnection ASYNC_CONN;
+  private Connection CONN;
   private static final byte[] FAMILY = Bytes.toBytes("testFamily");
   private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
   private static final byte[] VALUE = Bytes.toBytes("testValue");
@@ -79,7 +88,8 @@ public class TestClientScannerTimeouts {
   private static final byte[] ROW3 = Bytes.toBytes("row-3");
   private static final int rpcTimeout = 1000;
   private static final int scanTimeout = 3 * rpcTimeout;
-  private static final int metaScanTimeout = 6 * rpcTimeout;
+  private static final int metaReadRpcTimeout = 6 * rpcTimeout;
+  private static final int metaScanTimeout = 9 * rpcTimeout;
   private static final int CLIENT_RETRIES_NUMBER = 3;
 
   private static TableName tableName;
@@ -87,6 +97,14 @@ public class TestClientScannerTimeouts {
   @Rule
   public TestName name = new TestName();
 
+  @Parameterized.Parameter
+  public boolean useScannerTimeoutPeriodForNextCalls;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> parameters() {
+    return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
@@ -97,25 +115,38 @@ public class TestClientScannerTimeouts {
     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
     conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
     TEST_UTIL.startMiniCluster(1);
+  }
 
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
-    conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
+    conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaReadRpcTimeout);
     conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
+    conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
+      useScannerTimeoutPeriodForNextCalls);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
     CONN = ConnectionFactory.createConnection(conf);
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
+  @After
+  public void after() throws Exception {
     CONN.close();
     ASYNC_CONN.close();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
   }
 
   public void setup(boolean isSystemTable) throws IOException {
     RSRpcServicesWithScanTimeout.reset();
 
-    String nameAsString = name.getMethodName();
+    // parameterization adds non-alphanumeric chars to the method name. strip 
them so
+    // it parses as a table name
+    String nameAsString = name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_") 
+ "-"
+      + useScannerTimeoutPeriodForNextCalls;
     if (isSystemTable) {
       nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + 
nameAsString;
     }
@@ -168,22 +199,10 @@ public class TestClientScannerTimeouts {
     expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
   }
 
-  /**
-   * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} 
for normal scans. Use a
-   * special connection which has retries disabled, because otherwise the 
scanner will retry the
-   * timed out next() call and mess up the test.
-   */
   @Test
   public void testNormalScanTimeoutOnNext() throws IOException {
     setup(false);
-    // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and
-    // will retry until scannerTimeout. This makes it more difficult to test 
the timeouts
-    // of normal next() calls. So we use a separate connection here which has 
retries disabled.
-    Configuration confNoRetries = new Configuration(CONN.getConfiguration());
-    confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
-    try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) {
-      expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
-    }
+    testScanTimeoutOnNext(rpcTimeout, scanTimeout);
   }
 
   /**
@@ -221,7 +240,30 @@ public class TestClientScannerTimeouts {
   @Test
   public void testMetaScanTimeoutOnNext() throws IOException {
     setup(true);
-    expectTimeoutOnNext(metaScanTimeout, this::getScanner);
+    testScanTimeoutOnNext(metaReadRpcTimeout, metaScanTimeout);
+  }
+
+  private void testScanTimeoutOnNext(int rpcTimeout, int scannerTimeout) 
throws IOException {
+    if (useScannerTimeoutPeriodForNextCalls) {
+      // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS 
enabled, we pass
+      // scannerTimeout as the expected timeout duration.
+      expectTimeoutOnNext(scannerTimeout, this::getScanner);
+    } else {
+      // Otherwise we pass rpcTimeout as the expected timeout duration.
+      // In this case we need a special connection which disables retries, 
otherwise the scanner
+      // will retry the timed out next() call, which will cause out of order 
exception and mess up
+      // the test
+      try (Connection conn = getNoRetriesConnection()) {
+        // Now since we disabled 
HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout
+        expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
+      }
+    }
+  }
+
+  private Connection getNoRetriesConnection() throws IOException {
+    Configuration confNoRetries = new Configuration(CONN.getConfiguration());
+    confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+    return ConnectionFactory.createConnection(confNoRetries);
   }
 
   /**
@@ -240,7 +282,7 @@ public class TestClientScannerTimeouts {
   @Test
   public void testMetaScanTimeoutOnOpenScanner() throws IOException {
     setup(true);
-    expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
+    expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getScanner);
   }
 
   /**
@@ -249,7 +291,51 @@ public class TestClientScannerTimeouts {
   @Test
   public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
     setup(true);
-    expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
+    expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getAsyncScanner);
+  }
+
+  /**
+   * Test renewLease timeout for non-async scanner, which should use 
rpcTimeout. Async scanner does
+   * lease renewal automatically in the background, so renewLease() always 
returns false. So this
+   * test doesn't have an Async counterpart like the others.
+   */
+  @Test
+  public void testNormalScanTimeoutOnRenewLease() throws IOException {
+    setup(false);
+    expectTimeoutOnRenewScanner(rpcTimeout, this::getScanner);
+  }
+
+  /**
+   * Test renewLease timeout for non-async scanner, which should use 
rpcTimeout. Async scanner does
+   * lease renewal automatically in the background, so renewLease() always 
returns false. So this
+   * test doesn't have an Async counterpart like the others.
+   */
+  @Test
+  public void testMetaScanTimeoutOnRenewLease() throws IOException {
+    setup(true);
+    expectTimeoutOnRenewScanner(metaReadRpcTimeout, this::getScanner);
+  }
+
+  /**
+   * Test close timeout for non-async scanner, which should use rpcTimeout. 
Async scanner does
+   * closes async and always returns immediately. So this test doesn't have an 
Async counterpart
+   * like the others.
+   */
+  @Test
+  public void testNormalScanTimeoutOnClose() throws IOException {
+    setup(false);
+    expectTimeoutOnCloseScanner(rpcTimeout, this::getScanner);
+  }
+
+  /**
+   * Test close timeout for non-async scanner, which should use rpcTimeout. 
Async scanner does
+   * closes async and always returns immediately. So this test doesn't have an 
Async counterpart
+   * like the others.
+   */
+  @Test
+  public void testMetaScanTimeoutOnClose() throws IOException {
+    setup(true);
+    expectTimeoutOnCloseScanner(metaReadRpcTimeout, this::getScanner);
   }
 
   private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> 
scannerSupplier)
@@ -358,6 +444,34 @@ public class TestClientScannerTimeouts {
     expectTimeout(start, timeout);
   }
 
+  private void expectTimeoutOnRenewScanner(int timeout, 
Supplier<ResultScanner> scannerSupplier)
+    throws IOException {
+    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
+    RSRpcServicesWithScanTimeout.sleepOnRenew = true;
+    LOG.info(
+      "Opening scanner, expecting no timeouts from first next() call from 
openScanner response");
+    long start = System.nanoTime();
+    ResultScanner scanner = scannerSupplier.get();
+    scanner.next();
+    assertFalse("Expected renewLease to fail due to timeout", 
scanner.renewLease());
+    expectTimeout(start, timeout);
+  }
+
+  private void expectTimeoutOnCloseScanner(int timeout, 
Supplier<ResultScanner> scannerSupplier)
+    throws IOException {
+    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
+    RSRpcServicesWithScanTimeout.sleepOnClose = true;
+    LOG.info(
+      "Opening scanner, expecting no timeouts from first next() call from 
openScanner response");
+    long start = System.nanoTime();
+    ResultScanner scanner = scannerSupplier.get();
+    scanner.next();
+    // close doesnt throw or return anything, so we can't verify it directly.
+    // but we can verify that it took as long as we expect below
+    scanner.close();
+    expectTimeout(start, timeout);
+  }
+
   private void expectTimeout(long start, int timeout) {
     long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
     LOG.info("Expected duration >= {}, and got {}", timeout, duration);
@@ -412,6 +526,8 @@ public class TestClientScannerTimeouts {
 
     private static long seqNoToSleepOn = -1;
     private static boolean sleepOnOpen = false;
+    private static boolean sleepOnRenew = false;
+    private static boolean sleepOnClose = false;
     private static volatile boolean slept;
     private static int tryNumber = 0;
 
@@ -429,6 +545,8 @@ public class TestClientScannerTimeouts {
       throwAlways = false;
       threw = false;
       sleepOnOpen = false;
+      sleepOnRenew = false;
+      sleepOnClose = false;
       slept = false;
       tryNumber = 0;
     }
@@ -443,7 +561,19 @@ public class TestClientScannerTimeouts {
       if (request.hasScannerId()) {
         LOG.info("Got request {}", request);
         ScanResponse scanResponse = super.scan(controller, request);
-        if (tableScannerId != request.getScannerId() || 
request.getCloseScanner()) {
+        if (tableScannerId != request.getScannerId()) {
+          return scanResponse;
+        }
+        if (request.getCloseScanner()) {
+          if (!slept && sleepOnClose) {
+            try {
+              LOG.info("SLEEPING " + sleepTime);
+              Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+            }
+            slept = true;
+            tryNumber++;
+          }
           return scanResponse;
         }
 
@@ -458,7 +588,10 @@ public class TestClientScannerTimeouts {
           throw new ServiceException(new OutOfOrderScannerNextException());
         }
 
-        if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == 
request.getNextCallSeq()) {
+        if (
+          !slept && (request.hasNextCallSeq() && seqNoToSleepOn == 
request.getNextCallSeq()
+            || sleepOnRenew && request.getRenew())
+        ) {
           try {
             LOG.info("SLEEPING " + sleepTime);
             Thread.sleep(sleepTime);


Reply via email to