This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b06387b HBASE-19695 Handle disabled table for async client b06387b is described below commit b06387b2b0321f042cc74e4d66a2e8404086574c Author: zhangduo <zhang...@apache.org> AuthorDate: Sun Jan 13 14:34:07 2019 +0800 HBASE-19695 Handle disabled table for async client Signed-off-by: tianjingyun <tianjy1...@gmail.com> --- .../hadoop/hbase/client/AsyncRegionLocator.java | 13 ++-- .../hbase/client/AsyncRpcRetryingCaller.java | 75 ++++++++++++++++------ .../AsyncSingleRequestRpcRetryingCaller.java | 6 ++ .../apache/hadoop/hbase/client/TestAsyncTable.java | 34 ++++++++-- 4 files changed, 97 insertions(+), 31 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index d624974..8218761 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -23,9 +23,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -107,13 +105,14 @@ class AsyncRegionLocator { } HRegionLocation loc = locs.getRegionLocation(replicaId); if (loc == null) { - future - .completeExceptionally(new RegionException("No location for " + tableName + ", row='" + + future.completeExceptionally( + new RegionOfflineException("No location for " + tableName + ", row='" + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); } else if (loc.getServerName() == null) { - future.completeExceptionally(new HBaseIOException("No server address listed for region '" + - loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + - "', locateType=" + type + ", replicaId=" + replicaId)); + future.completeExceptionally( + new RegionOfflineException("No server address listed for region '" + + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + + "', locateType=" + type + ", replicaId=" + replicaId)); } else { future.complete(loc); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 5383ff8..a886b49 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -26,13 +26,19 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +116,27 @@ public abstract class AsyncRpcRetryingCaller<T> { resetController(controller, callTimeoutNs); } - protected final void onError(Throwable error, Supplier<String> errMsg, + private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) { + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; + if (maxDelayNs <= 0) { + completeExceptionally(); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + tries++; + retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); + } + + protected Optional<TableName> getTableName() { + return Optional.empty(); + } + + protected final void onError(Throwable t, Supplier<String> errMsg, Consumer<Throwable> updateCachedLocation) { if (future.isDone()) { // Give up if the future is already done, this is possible if user has already canceled the @@ -119,37 +145,50 @@ public abstract class AsyncRpcRetryingCaller<T> { LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); return; } - error = translateException(error); + Throwable error = translateException(t); if (error instanceof DoNotRetryIOException) { future.completeExceptionally(error); return; } if (tries > startLogErrorsCnt) { - LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts - + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) - + " ms, time elapsed = " + elapsedMs() + " ms", error); + LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts + + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + + " ms, time elapsed = " + elapsedMs() + " ms", error); } - RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext( - error, EnvironmentEdgeManager.currentTime(), ""); + updateCachedLocation.accept(error); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(error, + EnvironmentEdgeManager.currentTime(), ""); exceptions.add(qt); if (tries >= maxAttempts) { completeExceptionally(); return; } - long delayNs; - if (operationTimeoutNs > 0) { - long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; - if (maxDelayNs <= 0) { - completeExceptionally(); - return; + // check whether the table has been disabled, notice that the check will introduce a request to + // meta, so here we only check for disabled for some specific exception types. + if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) { + Optional<TableName> tableName = getTableName(); + if (tableName.isPresent()) { + FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> { + if (e != null) { + if (e instanceof TableNotFoundException) { + future.completeExceptionally(e); + } else { + // failed to test whether the table is disabled, not a big deal, continue retrying + tryScheduleRetry(error, updateCachedLocation); + } + return; + } + if (disabled) { + future.completeExceptionally(new TableNotEnabledException(tableName.get())); + } else { + tryScheduleRetry(error, updateCachedLocation); + } + }); } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + tryScheduleRetry(error, updateCachedLocation); } - updateCachedLocation.accept(error); - tries++; - retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); } protected abstract void doCall(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 4b60b18..a552e40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -114,4 +115,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> { call(loc); }); } + + @Override + protected Optional<TableName> getTableName() { + return Optional.of(tableName); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index d119f1c..85a6d9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hbase.client; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.io.UncheckedIOException; @@ -40,6 +44,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -115,8 +120,11 @@ public class TestAsyncTable { } @Before - public void setUp() throws IOException, InterruptedException { + public void setUp() throws IOException, InterruptedException, ExecutionException { row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); + if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { + ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); + } } @Test @@ -283,14 +291,14 @@ public class TestAsyncTable { public void testMutateRow() throws InterruptedException, ExecutionException, IOException { AsyncTable<?> table = getTable.get(); RowMutations mutation = new RowMutations(row); - mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); + mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); table.mutateRow(mutation).get(); Result result = table.get(new Get(row)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); mutation = new RowMutations(row); - mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); - mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); + mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); + mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); table.mutateRow(mutation).get(); result = table.get(new Get(row)).get(); assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); @@ -314,8 +322,9 @@ public class TestAsyncTable { IntStream.range(0, count).forEach(i -> { RowMutations mutation = new RowMutations(row); try { - mutation.add(new Delete(row).addColumn(FAMILY, QUALIFIER)); - mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); + mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); + mutation + .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -400,4 +409,17 @@ public class TestAsyncTable { .get(); assertTrue(ok); } + + @Test + public void testDisabled() throws InterruptedException, ExecutionException { + ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); + try { + getTable.get().get(new Get(row)).get(); + fail("Should fail since table has been disabled"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + assertThat(cause, instanceOf(TableNotEnabledException.class)); + assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); + } + } }