This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b06387b  HBASE-19695 Handle disabled table for async client
b06387b is described below

commit b06387b2b0321f042cc74e4d66a2e8404086574c
Author: zhangduo <zhang...@apache.org>
AuthorDate: Sun Jan 13 14:34:07 2019 +0800

    HBASE-19695 Handle disabled table for async client
    
    Signed-off-by: tianjingyun <tianjy1...@gmail.com>
---
 .../hadoop/hbase/client/AsyncRegionLocator.java    | 13 ++--
 .../hbase/client/AsyncRpcRetryingCaller.java       | 75 ++++++++++++++++------
 .../AsyncSingleRequestRpcRetryingCaller.java       |  6 ++
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 34 ++++++++--
 4 files changed, 97 insertions(+), 31 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index d624974..8218761 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -23,9 +23,7 @@ import static 
org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionException;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -107,13 +105,14 @@ class AsyncRegionLocator {
       }
       HRegionLocation loc = locs.getRegionLocation(replicaId);
       if (loc == null) {
-        future
-          .completeExceptionally(new RegionException("No location for " + 
tableName + ", row='" +
+        future.completeExceptionally(
+          new RegionOfflineException("No location for " + tableName + ", 
row='" +
             Bytes.toStringBinary(row) + "', locateType=" + type + ", 
replicaId=" + replicaId));
       } else if (loc.getServerName() == null) {
-        future.completeExceptionally(new HBaseIOException("No server address 
listed for region '" +
-          loc.getRegion().getRegionNameAsString() + ", row='" + 
Bytes.toStringBinary(row) +
-          "', locateType=" + type + ", replicaId=" + replicaId));
+        future.completeExceptionally(
+          new RegionOfflineException("No server address listed for region '" +
+            loc.getRegion().getRegionNameAsString() + ", row='" + 
Bytes.toStringBinary(row) +
+            "', locateType=" + type + ", replicaId=" + replicaId));
       } else {
         future.complete(loc);
       }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 5383ff8..a886b49 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -26,13 +26,19 @@ import static 
org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +116,27 @@ public abstract class AsyncRpcRetryingCaller<T> {
     resetController(controller, callTimeoutNs);
   }
 
-  protected final void onError(Throwable error, Supplier<String> errMsg,
+  private void tryScheduleRetry(Throwable error, Consumer<Throwable> 
updateCachedLocation) {
+    long delayNs;
+    if (operationTimeoutNs > 0) {
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
+      if (maxDelayNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+    } else {
+      delayNs = getPauseTime(pauseNs, tries - 1);
+    }
+    tries++;
+    retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
+  }
+
+  protected Optional<TableName> getTableName() {
+    return Optional.empty();
+  }
+
+  protected final void onError(Throwable t, Supplier<String> errMsg,
       Consumer<Throwable> updateCachedLocation) {
     if (future.isDone()) {
       // Give up if the future is already done, this is possible if user has 
already canceled the
@@ -119,37 +145,50 @@ public abstract class AsyncRpcRetryingCaller<T> {
       LOG.debug("The future is already done, canceled={}, give up retrying", 
future.isCancelled());
       return;
     }
-    error = translateException(error);
+    Throwable error = translateException(t);
     if (error instanceof DoNotRetryIOException) {
       future.completeExceptionally(error);
       return;
     }
     if (tries > startLogErrorsCnt) {
-      LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + 
maxAttempts
-          + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs)
-          + " ms, time elapsed = " + elapsedMs() + " ms", error);
+      LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + 
maxAttempts +
+        ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) +
+        " ms, time elapsed = " + elapsedMs() + " ms", error);
     }
-    RetriesExhaustedException.ThrowableWithExtraContext qt = new 
RetriesExhaustedException.ThrowableWithExtraContext(
-        error, EnvironmentEdgeManager.currentTime(), "");
+    updateCachedLocation.accept(error);
+    RetriesExhaustedException.ThrowableWithExtraContext qt =
+      new RetriesExhaustedException.ThrowableWithExtraContext(error,
+        EnvironmentEdgeManager.currentTime(), "");
     exceptions.add(qt);
     if (tries >= maxAttempts) {
       completeExceptionally();
       return;
     }
-    long delayNs;
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        completeExceptionally();
-        return;
+    // check whether the table has been disabled, notice that the check will 
introduce a request to
+    // meta, so here we only check for disabled for some specific exception 
types.
+    if (error instanceof NotServingRegionException || error instanceof 
RegionOfflineException) {
+      Optional<TableName> tableName = getTableName();
+      if (tableName.isPresent()) {
+        
FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), 
(disabled, e) -> {
+          if (e != null) {
+            if (e instanceof TableNotFoundException) {
+              future.completeExceptionally(e);
+            } else {
+              // failed to test whether the table is disabled, not a big deal, 
continue retrying
+              tryScheduleRetry(error, updateCachedLocation);
+            }
+            return;
+          }
+          if (disabled) {
+            future.completeExceptionally(new 
TableNotEnabledException(tableName.get()));
+          } else {
+            tryScheduleRetry(error, updateCachedLocation);
+          }
+        });
       }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
     } else {
-      delayNs = getPauseTime(pauseNs, tries - 1);
+      tryScheduleRetry(error, updateCachedLocation);
     }
-    updateCachedLocation.accept(error);
-    tries++;
-    retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
   }
 
   protected abstract void doCall();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 4b60b18..a552e40 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -114,4 +115,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCaller<T> {
         call(loc);
       });
   }
+
+  @Override
+  protected Optional<TableName> getTableName() {
+    return Optional.of(tableName);
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index d119f1c..85a6d9d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -40,6 +44,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -115,8 +120,11 @@ public class TestAsyncTable {
   }
 
   @Before
-  public void setUp() throws IOException, InterruptedException {
+  public void setUp() throws IOException, InterruptedException, 
ExecutionException {
     row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", 
"_"));
+    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
+      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
+    }
   }
 
   @Test
@@ -283,14 +291,14 @@ public class TestAsyncTable {
   public void testMutateRow() throws InterruptedException, ExecutionException, 
IOException {
     AsyncTable<?> table = getTable.get();
     RowMutations mutation = new RowMutations(row);
-    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
+    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
1), VALUE));
     table.mutateRow(mutation).get();
     Result result = table.get(new Get(row)).get();
     assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
 
     mutation = new RowMutations(row);
-    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
-    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
+    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, 
concat(QUALIFIER, 1)));
+    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 
2), VALUE));
     table.mutateRow(mutation).get();
     result = table.get(new Get(row)).get();
     assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
@@ -314,8 +322,9 @@ public class TestAsyncTable {
     IntStream.range(0, count).forEach(i -> {
       RowMutations mutation = new RowMutations(row);
       try {
-        mutation.add(new Delete(row).addColumn(FAMILY, QUALIFIER));
-        mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), 
concat(VALUE, i)));
+        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
+        mutation
+          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), 
concat(VALUE, i)));
       } catch (IOException e) {
         throw new UncheckedIOException(e);
       }
@@ -400,4 +409,17 @@ public class TestAsyncTable {
       .get();
     assertTrue(ok);
   }
+
+  @Test
+  public void testDisabled() throws InterruptedException, ExecutionException {
+    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
+    try {
+      getTable.get().get(new Get(row)).get();
+      fail("Should fail since table has been disabled");
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      assertThat(cause, instanceOf(TableNotEnabledException.class));
+      assertThat(cause.getMessage(), 
containsString(TABLE_NAME.getNameAsString()));
+    }
+  }
 }

Reply via email to