Repository: hbase
Updated Branches:
  refs/heads/master ef8c65e54 -> acc606571


HBASE-16837 Implement checkAndPut and checkAndDelete


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/acc60657
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/acc60657
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/acc60657

Branch: refs/heads/master
Commit: acc606571bca4e82aec7136cff64bf6187dd71f7
Parents: ef8c65e
Author: zhangduo <zhang...@apache.org>
Authored: Tue Oct 18 21:46:27 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Wed Oct 19 13:32:24 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncTable.java  | 67 ++++++++++++++++++++
 .../hadoop/hbase/client/AsyncTableImpl.java     | 36 ++++++++++-
 .../hadoop/hbase/client/TestAsyncTable.java     | 58 +++++++++++++++++
 3 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/acc60657/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 2ed3c26..6019bdc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
@@ -204,4 +205,70 @@ public interface AsyncTable {
       new Increment(row).addColumn(family, qualifier, 
amount).setDurability(durability))
           .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
   }
+
+  /**
+   * Atomically checks if a row/family/qualifier value equals to the expected 
value. If it does, it
+   * adds the put. If the passed value is null, the check is for the lack of 
column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @return true if the new put was executed, false otherwise. The return 
value will be wrapped by
+   *         a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, 
byte[] qualifier,
+      byte[] value, Put put) {
+    return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected 
value. If it does, it
+   * adds the put. If the passed value is null, the check is for the lack of 
column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @return true if the new put was executed, false otherwise. The return 
value will be wrapped by
+   *         a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] 
qualifier,
+      CompareOp compareOp, byte[] value, Put put);
+
+  /**
+   * Atomically checks if a row/family/qualifier value equals to the expected 
value. If it does, it
+   * adds the delete. If the passed value is null, the check is for the lack 
of column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param delete data to delete if check succeeds
+   * @return true if the new delete was executed, false otherwise. The return 
value will be wrapped
+   *         by a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, 
byte[] qualifier,
+      byte[] value, Delete delete) {
+    return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, 
delete);
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected 
value. If it does, it
+   * adds the delete. If the passed value is null, the check is for the lack 
of column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value the expected value
+   * @param delete data to delete if check succeeds
+   * @return true if the new delete was executed, false otherwise. The return 
value will be wrapped
+   *         by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] 
qualifier,
+      CompareOp compareOp, byte[] value, Delete delete);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/acc60657/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 89f798c..b7dc388 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import 
org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -37,6 +39,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
 
 /**
  * The implementation of AsyncTable.
@@ -151,12 +154,16 @@ class AsyncTableImpl implements AsyncTable {
       (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), 
respConverter);
   }
 
-  private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long 
rpcTimeoutNs) {
-    return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
+  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long 
rpcTimeoutNs) {
+    return conn.callerFactory.<T> single().table(tableName).row(row)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
+  private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long 
rpcTimeoutNs) {
+    return newCaller(row.getRow(), rpcTimeoutNs);
+  }
+
   @Override
   public CompletableFuture<Result> get(Get get) {
     return this.<Result> newCaller(get, readRpcTimeoutNs)
@@ -202,6 +209,30 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
+  public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, 
byte[] qualifier,
+      CompareOp compareOp, byte[] value, Put put) {
+    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> AsyncTableImpl.<Put, Boolean> 
mutate(controller, loc,
+          stub, put,
+          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
+            new BinaryComparator(value), 
CompareType.valueOf(compareOp.name()), p),
+          (c, r) -> r.getProcessed()))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, 
byte[] qualifier,
+      CompareOp compareOp, byte[] value, Delete delete) {
+    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> AsyncTableImpl.<Delete, Boolean> 
mutate(controller, loc,
+          stub, delete,
+          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
+            new BinaryComparator(value), 
CompareType.valueOf(compareOp.name()), d),
+          (c, r) -> r.getProcessed()))
+        .call();
+  }
+
+  @Override
   public void setReadRpcTimeout(long timeout, TimeUnit unit) {
     this.readRpcTimeoutNs = unit.toNanos(timeout);
   }
@@ -230,4 +261,5 @@ class AsyncTableImpl implements AsyncTable {
   public long getOperationTimeout(TimeUnit unit) {
     return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/acc60657/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 41002cb..8b3ab62 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
@@ -184,4 +185,61 @@ public class TestAsyncTable {
         .sorted().toArray();
     assertArrayEquals(IntStream.range(0, count).toArray(), actual);
   }
+
+  @Test
+  public void testCheckAndPut() throws InterruptedException, 
ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    int count = 10;
+    CountDownLatch latch = new CountDownLatch(count);
+    IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, 
QUALIFIER, null,
+      new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, 
i))).thenAccept(x -> {
+        if (x) {
+          successCount.incrementAndGet();
+          successIndex.set(i);
+        }
+        latch.countDown();
+      }));
+    latch.await();
+    assertEquals(1, successCount.get());
+    String actual = Bytes.toString(table.get(new 
Get(row)).get().getValue(FAMILY, QUALIFIER));
+    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
+  }
+
+  @Test
+  public void testCheckAndDelete() throws InterruptedException, 
ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    int count = 10;
+    CountDownLatch putLatch = new CountDownLatch(count + 1);
+    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> 
putLatch.countDown());
+    IntStream.range(0, count)
+        .forEach(i -> table.put(new Put(row).addColumn(FAMILY, 
concat(QUALIFIER, i), VALUE))
+            .thenRun(() -> putLatch.countDown()));
+    putLatch.await();
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    CountDownLatch deleteLatch = new CountDownLatch(count);
+    IntStream.range(0, count).forEach(i -> table
+        .checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
+          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, 
concat(QUALIFIER, i)))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          deleteLatch.countDown();
+        }));
+    deleteLatch.await();
+    assertEquals(1, successCount.get());
+    Result result = table.get(new Get(row)).get();
+    IntStream.range(0, count).forEach(i -> {
+      if (i == successIndex.get()) {
+        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
+      } else {
+        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 
i)));
+      }
+    });
+  }
 }

Reply via email to