Repository: hbase Updated Branches: refs/heads/master ef8c65e54 -> acc606571
HBASE-16837 Implement checkAndPut and checkAndDelete Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/acc60657 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/acc60657 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/acc60657 Branch: refs/heads/master Commit: acc606571bca4e82aec7136cff64bf6187dd71f7 Parents: ef8c65e Author: zhangduo <zhang...@apache.org> Authored: Tue Oct 18 21:46:27 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Oct 19 13:32:24 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/AsyncTable.java | 67 ++++++++++++++++++++ .../hadoop/hbase/client/AsyncTableImpl.java | 36 ++++++++++- .../hadoop/hbase/client/TestAsyncTable.java | 58 +++++++++++++++++ 3 files changed, 159 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/acc60657/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 2ed3c26..6019bdc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -204,4 +205,70 @@ public interface AsyncTable { new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); } + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) { + return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put); + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) { + return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete); } http://git-wip-us.apache.org/repos/asf/hbase/blob/acc60657/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 89f798c..b7dc388 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; /** * The implementation of AsyncTable. @@ -151,12 +154,16 @@ class AsyncTableImpl implements AsyncTable { (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } - private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) { - return conn.callerFactory.<T> single().table(tableName).row(row.getRow()) + private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) { + return conn.callerFactory.<T> single().table(tableName).row(row) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); } + private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) { + return newCaller(row.getRow(), rpcTimeoutNs); + } + @Override public CompletableFuture<Result> get(Get get) { return this.<Result> newCaller(get, readRpcTimeoutNs) @@ -202,6 +209,30 @@ class AsyncTableImpl implements AsyncTable { } @Override + public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) { + return this.<Boolean> newCaller(row, writeRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl.<Put, Boolean> mutate(controller, loc, + stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) { + return this.<Boolean> newCaller(row, writeRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl.<Delete, Boolean> mutate(controller, loc, + stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); } @@ -230,4 +261,5 @@ class AsyncTableImpl implements AsyncTable { public long getOperationTimeout(TimeUnit unit) { return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/acc60657/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 41002cb..8b3ab62 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -28,6 +28,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -184,4 +185,61 @@ public class TestAsyncTable { .sorted().toArray(); assertArrayEquals(IntStream.range(0, count).toArray(), actual); } + + @Test + public void testCheckAndPut() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + int count = 10; + CountDownLatch latch = new CountDownLatch(count); + IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null, + new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + latch.countDown(); + })); + latch.await(); + assertEquals(1, successCount.get()); + String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); + assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); + } + + @Test + public void testCheckAndDelete() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 10; + CountDownLatch putLatch = new CountDownLatch(count + 1); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); + IntStream.range(0, count) + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); + putLatch.await(); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + CountDownLatch deleteLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach(i -> table + .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); + deleteLatch.await(); + assertEquals(1, successCount.get()); + Result result = table.get(new Get(row)).get(); + IntStream.range(0, count).forEach(i -> { + if (i == successIndex.get()) { + assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); + } else { + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); + } + }); + } }