HBASE-18951 Use Builder pattern to remove nullable parameters for checkAndXXX 
methods in RawAsyncTable/AsyncTable interface


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d5b76547
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d5b76547
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d5b76547

Branch: refs/heads/branch-2
Commit: d5b76547f088783041212df559bcb08b6c641776
Parents: 294f6b7
Author: zhangduo <zhang...@apache.org>
Authored: Tue Oct 10 11:11:19 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Oct 10 14:41:27 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncTableBase.java     | 158 ++++++++-----------
 .../hadoop/hbase/client/AsyncTableImpl.java     |  57 +++++--
 .../hadoop/hbase/client/RawAsyncTableImpl.java  | 123 ++++++++++-----
 .../hadoop/hbase/client/TestAsyncTable.java     |  53 ++++---
 4 files changed, 219 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d5b76547/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index ed4c497..cc9f337 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -191,72 +191,75 @@ public interface AsyncTableBase {
   }
 
   /**
-   * Atomically checks if a row/family/qualifier value equals to the expected 
value. If it does, it
-   * adds the put. If the passed value is null, the check is for the lack of 
column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param value the expected value
-   * @param put data to put if check succeeds
-   * @return true if the new put was executed, false otherwise. The return 
value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, 
byte[] qualifier,
-      byte[] value, Put put) {
-    return checkAndPut(row, family, qualifier, CompareOperator.EQUAL, value, 
put);
-  }
-
-  /**
    * Atomically checks if a row/family/qualifier value matches the expected 
value. If it does, it
-   * adds the put. If the passed value is null, the check is for the lack of 
column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp comparison operator to use
-   * @param value the expected value
-   * @param put data to put if check succeeds
-   * @return true if the new put was executed, false otherwise. The return 
value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] 
qualifier,
-                                         CompareOperator compareOp, byte[] 
value, Put put);
-
-  /**
-   * Atomically checks if a row/family/qualifier value equals to the expected 
value. If it does, it
-   * adds the delete. If the passed value is null, the check is for the lack 
of column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param value the expected value
-   * @param delete data to delete if check succeeds
-   * @return true if the new delete was executed, false otherwise. The return 
value will be wrapped
-   *         by a {@link CompletableFuture}.
+   * adds the Put/Delete/RowMutations.
+   * <p>
+   * Use the returned {@link CheckAndMutateBuilder} to construct your request 
and then execute it.
+   * This is a fluent style API, the code is like:
+   *
+   * <pre>
+   * <code>
+   * table.checkAndMutate(row, 
family).qualifier(qualifier).ifNotExists().thenPut(put)
+   *     .thenAccept(succ -> {
+   *       if (succ) {
+   *         System.out.println("Check and put succeeded");
+   *       } else {
+   *         System.out.println("Check and put failed");
+   *       }
+   *     });
+   * </code>
+   * </pre>
    */
-  default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, 
byte[] qualifier,
-      byte[] value, Delete delete) {
-    return checkAndDelete(row, family, qualifier, CompareOperator.EQUAL, 
value, delete);
+  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
+
+  /**
+   * A helper class for sending checkAndMutate request.
+   */
+  interface CheckAndMutateBuilder {
+
+    /**
+     * @param qualifier column qualifier to check.
+     */
+    CheckAndMutateBuilder qualifier(byte[] qualifier);
+
+    /**
+     * Check for lack of column.
+     */
+    CheckAndMutateBuilder ifNotExists();
+
+    default CheckAndMutateBuilder ifEquals(byte[] value) {
+      return ifMatches(CompareOperator.EQUAL, value);
+    }
+
+    /**
+     * @param compareOp comparison operator to use
+     * @param value the expected value
+     */
+    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
+
+    /**
+     * @param put data to put if check succeeds
+     * @return {@code true} if the new put was executed, {@code false} 
otherwise. The return value
+     *         will be wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenPut(Put put);
+
+    /**
+     * @param delete data to delete if check succeeds
+     * @return {@code true} if the new delete was executed, {@code false} 
otherwise. The return
+     *         value will be wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenDelete(Delete delete);
+
+    /**
+     * @param mutation mutations to perform if check succeeds
+     * @return true if the new mutation was executed, false otherwise. The 
return value will be
+     *         wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
   }
 
   /**
-   * Atomically checks if a row/family/qualifier value matches the expected 
value. If it does, it
-   * adds the delete. If the passed value is null, the check is for the lack 
of column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp comparison operator to use
-   * @param value the expected value
-   * @param delete data to delete if check succeeds
-   * @return true if the new delete was executed, false otherwise. The return 
value will be wrapped
-   *         by a {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] 
qualifier,
-                                            CompareOperator compareOp, byte[] 
value, Delete delete);
-
-  /**
    * Performs multiple mutations atomically on a single row. Currently {@link 
Put} and
    * {@link Delete} are supported.
    * @param mutation object that specifies the set of mutations to perform 
atomically
@@ -265,39 +268,6 @@ public interface AsyncTableBase {
   CompletableFuture<Void> mutateRow(RowMutations mutation);
 
   /**
-   * Atomically checks if a row/family/qualifier value equals to the expected 
value. If it does, it
-   * performs the row mutations. If the passed value is null, the check is for 
the lack of column
-   * (ie: non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param value the expected value
-   * @param mutation mutations to perform if check succeeds
-   * @return true if the new put was executed, false otherwise. The return 
value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, 
byte[] qualifier,
-      byte[] value, RowMutations mutation) {
-    return checkAndMutate(row, family, qualifier, CompareOperator.EQUAL, 
value, mutation);
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected 
value. If it does, it
-   * performs the row mutations. If the passed value is null, the check is for 
the lack of column
-   * (ie: non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp the comparison operator
-   * @param value the expected value
-   * @param mutation mutations to perform if check succeeds
-   * @return true if the new put was executed, false otherwise. The return 
value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] 
qualifier,
-                                            CompareOperator compareOp, byte[] 
value, RowMutations mutation);
-
-  /**
    * Return all the results that match the given scan object.
    * <p>
    * Notice that usually you should use this method with a {@link Scan} object 
that has limit set.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5b76547/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 2b7a64e..ae43f5b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -17,18 +17,19 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static java.util.stream.Collectors.toList;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import static java.util.stream.Collectors.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * The implementation of AsyncTable. Based on {@link RawAsyncTable}.
@@ -121,15 +122,44 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, 
byte[] qualifier,
-                                                CompareOperator op, byte[] 
value, Put put) {
-    return wrap(rawTable.checkAndPut(row, family, qualifier, op, value, put));
-  }
+  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
+    return new CheckAndMutateBuilder() {
 
-  @Override
-  public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, 
byte[] qualifier,
-                                                   CompareOperator op, byte[] 
value, Delete delete) {
-    return wrap(rawTable.checkAndDelete(row, family, qualifier, op, value, 
delete));
+      private final CheckAndMutateBuilder builder = 
rawTable.checkAndMutate(row, family);
+
+      @Override
+      public CompletableFuture<Boolean> thenPut(Put put) {
+        return wrap(builder.thenPut(put));
+      }
+
+      @Override
+      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
+        return wrap(builder.thenMutate(mutation));
+      }
+
+      @Override
+      public CompletableFuture<Boolean> thenDelete(Delete delete) {
+        return wrap(builder.thenDelete(delete));
+      }
+
+      @Override
+      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
+        builder.qualifier(qualifier);
+        return this;
+      }
+
+      @Override
+      public CheckAndMutateBuilder ifNotExists() {
+        builder.ifNotExists();
+        return this;
+      }
+
+      @Override
+      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] 
value) {
+        builder.ifMatches(compareOp, value);
+        return this;
+      }
+    };
   }
 
   @Override
@@ -138,12 +168,6 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, 
byte[] qualifier,
-                                                   CompareOperator op, byte[] 
value, RowMutations mutation) {
-    return wrap(rawTable.checkAndMutate(row, family, qualifier, op, value, 
mutation));
-  }
-
-  @Override
   public CompletableFuture<List<Result>> scanAll(Scan scan) {
     return wrap(rawTable.scanAll(scan));
   }
@@ -198,5 +222,4 @@ class AsyncTableImpl implements AsyncTable {
   public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
     return rawTable.<T> 
batch(actions).stream().map(this::wrap).collect(toList());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5b76547/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 722ee26..6107f7f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -23,6 +23,8 @@ import static 
org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 
+import com.google.protobuf.RpcChannel;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -59,8 +62,6 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
 
-import com.google.protobuf.RpcChannel;
-
 /**
  * The implementation of RawAsyncTable.
  */
@@ -134,7 +135,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
       Converter<RESP, HBaseRpcController, PRESP> respConverter) {
     CompletableFuture<RESP> future = new CompletableFuture<>();
     try {
-      rpcCall.call(stub, controller, 
reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
+      rpcCall.call(stub, controller, 
reqConvert.convert(loc.getRegion().getRegionName(), req),
         new RpcCallback<PRESP>() {
 
           @Override
@@ -251,28 +252,89 @@ class RawAsyncTableImpl implements RawAsyncTable {
         .call();
   }
 
-  @Override
-  public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, 
byte[] qualifier,
-                                                CompareOperator op, byte[] 
value, Put put) {
-    return this.<Boolean> newCaller(row, rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> 
mutate(controller, loc,
-          stub, put,
-          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
-            new BinaryComparator(value), CompareType.valueOf(op.name()), p),
-          (c, r) -> r.getProcessed()))
-        .call();
+  private final class CheckAndMutateBuilderImpl implements 
CheckAndMutateBuilder {
+
+    private final byte[] row;
+
+    private final byte[] family;
+
+    private byte[] qualifier;
+
+    private CompareOperator op;
+
+    private byte[] value;
+
+    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
+      this.row = Preconditions.checkNotNull(row, "row is null");
+      this.family = Preconditions.checkNotNull(family, "family is null");
+    }
+
+    @Override
+    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
+      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is 
null. Consider using" +
+          " an empty byte array, or just do not call this method if you want a 
null qualifier");
+      return this;
+    }
+
+    @Override
+    public CheckAndMutateBuilder ifNotExists() {
+      this.op = CompareOperator.EQUAL;
+      this.value = null;
+      return this;
+    }
+
+    @Override
+    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] 
value) {
+      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
+      this.value = Preconditions.checkNotNull(value, "value is null");
+      return this;
+    }
+
+    private void preCheck() {
+      Preconditions.checkNotNull(op, "condition is null. You need to specify 
the condition by" +
+          " calling ifNotExists/ifEquals/ifMatches before executing the 
request");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> thenPut(Put put) {
+      preCheck();
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
+          .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> 
mutate(controller,
+            loc, stub, put,
+            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
+              new BinaryComparator(value), CompareType.valueOf(op.name()), p),
+            (c, r) -> r.getProcessed()))
+          .call();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> thenDelete(Delete delete) {
+      preCheck();
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
+          .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, 
Boolean> mutate(controller,
+            loc, stub, delete,
+            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
+              new BinaryComparator(value), CompareType.valueOf(op.name()), d),
+            (c, r) -> r.getProcessed()))
+          .call();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
+      preCheck();
+      return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
+          .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> 
mutateRow(controller, loc,
+            stub, mutation,
+            (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
+              new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
+            resp -> resp.getExists()))
+          .call();
+    }
   }
 
   @Override
-  public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, 
byte[] qualifier,
-                                                   CompareOperator op, byte[] 
value, Delete delete) {
-    return this.<Boolean> newCaller(row, rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> 
mutate(controller,
-          loc, stub, delete,
-          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
-            new BinaryComparator(value), CompareType.valueOf(op.name()), d),
-          (c, r) -> r.getProcessed()))
-        .call();
+  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
+    return new CheckAndMutateBuilderImpl(row, family);
   }
 
   // We need the MultiRequest when constructing the 
org.apache.hadoop.hbase.client.MultiResponse,
@@ -283,7 +345,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
       Function<Result, RESP> respConverter) {
     CompletableFuture<RESP> future = new CompletableFuture<>();
     try {
-      byte[] regionName = loc.getRegionInfo().getRegionName();
+      byte[] regionName = loc.getRegion().getRegionName();
       MultiRequest req = reqConvert.convert(regionName, mutation);
       stub.multi(controller, req, new RpcCallback<MultiResponse>() {
 
@@ -328,18 +390,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
         }, resp -> null)).call();
   }
 
-  @Override
-  public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, 
byte[] qualifier,
-                                                   CompareOperator op, byte[] 
value, RowMutations mutation) {
-    return this.<Boolean> newCaller(mutation, rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> 
mutateRow(controller, loc,
-          stub, mutation,
-          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
-            new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
-          resp -> resp.getExists()))
-        .call();
-  }
-
   private Scan setDefaultScanConfig(Scan scan) {
     // always create a new scan object as we may reset the start row later.
     Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
@@ -488,7 +538,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
       return;
     }
     unfinishedRequest.incrementAndGet();
-    RegionInfo region = loc.getRegionInfo();
+    RegionInfo region = loc.getRegion();
     if (locateFinished(region, endKey, endKeyInclusive)) {
       locateFinished.set(true);
     } else {
@@ -524,4 +574,5 @@ class RawAsyncTableImpl implements RawAsyncTable {
           (loc, error) -> onLocateComplete(stubMaker, callable, callback, 
locs, nonNullEndKey,
             endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), 
loc, error));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5b76547/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 2fea0eb..5d8b116 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -221,14 +221,15 @@ public class TestAsyncTable {
     AtomicInteger successIndex = new AtomicInteger(-1);
     int count = 10;
     CountDownLatch latch = new CountDownLatch(count);
-    IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, 
QUALIFIER, null,
-      new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, 
i))).thenAccept(x -> {
-        if (x) {
-          successCount.incrementAndGet();
-          successIndex.set(i);
-        }
-        latch.countDown();
-      }));
+    IntStream.range(0, count)
+        .forEach(i -> table.checkAndMutate(row, 
FAMILY).qualifier(QUALIFIER).ifNotExists()
+            .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, 
i))).thenAccept(x -> {
+              if (x) {
+                successCount.incrementAndGet();
+                successIndex.set(i);
+              }
+              latch.countDown();
+            }));
     latch.await();
     assertEquals(1, successCount.get());
     String actual = Bytes.toString(table.get(new 
Get(row)).get().getValue(FAMILY, QUALIFIER));
@@ -249,16 +250,17 @@ public class TestAsyncTable {
     AtomicInteger successCount = new AtomicInteger(0);
     AtomicInteger successIndex = new AtomicInteger(-1);
     CountDownLatch deleteLatch = new CountDownLatch(count);
-    IntStream.range(0, count).forEach(i -> table
-        .checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
-          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, 
concat(QUALIFIER, i)))
-        .thenAccept(x -> {
-          if (x) {
-            successCount.incrementAndGet();
-            successIndex.set(i);
-          }
-          deleteLatch.countDown();
-        }));
+    IntStream.range(0, count)
+        .forEach(i -> table.checkAndMutate(row, 
FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
+            .thenDelete(
+              new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, 
concat(QUALIFIER, i)))
+            .thenAccept(x -> {
+              if (x) {
+                successCount.incrementAndGet();
+                successIndex.set(i);
+              }
+              deleteLatch.countDown();
+            }));
     deleteLatch.await();
     assertEquals(1, successCount.get());
     Result result = table.get(new Get(row)).get();
@@ -311,13 +313,14 @@ public class TestAsyncTable {
       } catch (IOException e) {
         throw new UncheckedIOException(e);
       }
-      table.checkAndMutate(row, FAMILY, QUALIFIER, VALUE, 
mutation).thenAccept(x -> {
-        if (x) {
-          successCount.incrementAndGet();
-          successIndex.set(i);
-        }
-        mutateLatch.countDown();
-      });
+      table.checkAndMutate(row, 
FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
+          .thenAccept(x -> {
+            if (x) {
+              successCount.incrementAndGet();
+              successIndex.set(i);
+            }
+            mutateLatch.countDown();
+          });
     });
     mutateLatch.await();
     assertEquals(1, successCount.get());

Reply via email to