This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new b715e0c  HBASE-21909 Validate the put instance before executing in 
AsyncTable.put method
b715e0c is described below

commit b715e0c3735afd3e39242ce741cabcccc3c15fcf
Author: zhangduo <zhang...@apache.org>
AuthorDate: Fri Feb 15 20:57:56 2019 +0800

    HBASE-21909 Validate the put instance before executing in AsyncTable.put 
method
    
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../hbase/client/AsyncBufferedMutatorBuilder.java  |  7 +++++++
 .../client/AsyncBufferedMutatorBuilderImpl.java    | 15 +++++++++++++--
 .../hbase/client/AsyncBufferedMutatorImpl.java     | 14 ++++++++++++--
 .../hbase/client/AsyncConnectionConfiguration.java |  9 +++++++++
 .../hadoop/hbase/client/BufferedMutatorImpl.java   |  2 +-
 .../hadoop/hbase/client/ConnectionUtils.java       | 16 ++++++++++++++++
 .../org/apache/hadoop/hbase/client/HTable.java     | 22 +++-------------------
 .../hadoop/hbase/client/HTableMultiplexer.java     |  4 ++--
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  6 ++++++
 .../hbase/client/TestAsyncBufferMutator.java       |  6 +++---
 10 files changed, 72 insertions(+), 29 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index c617c8e..ea2528d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -98,6 +98,13 @@ public interface AsyncBufferedMutatorBuilder {
   AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize);
 
   /**
+   * Override the maximum key-value size specified by the provided {@link 
AsyncConnection}'s
+   * {@link org.apache.hadoop.conf.Configuration} instance, via the 
configuration key
+   * {@code hbase.client.keyvalue.maxsize}.
+   */
+  AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
+
+  /**
    * Create the {@link AsyncBufferedMutator} instance.
    */
   AsyncBufferedMutator build();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index eb8af17..cd04963 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -37,11 +37,14 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
 
   private long periodicFlushTimeoutNs;
 
+  private int maxKeyValueSize;
+
   public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
       AsyncTableBuilder<?> tableBuilder, HashedWheelTimer 
periodicalFlushTimer) {
     this.tableBuilder = tableBuilder;
     this.writeBufferSize = connConf.getWriteBufferSize();
     this.periodicFlushTimeoutNs = 
connConf.getWriteBufferPeriodicFlushTimeoutNs();
+    this.maxKeyValueSize = connConf.getMaxKeyValueSize();
     this.periodicalFlushTimer = periodicalFlushTimer;
   }
 
@@ -77,7 +80,7 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
 
   @Override
   public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
-    Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must 
be >= 0",
+    Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must 
be > 0",
       writeBufferSize);
     this.writeBufferSize = writeBufferSize;
     return this;
@@ -90,8 +93,16 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
   }
 
   @Override
+  public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize) {
+    Preconditions.checkArgument(maxKeyValueSize > 0, "maxKeyValueSize %d must 
be > 0",
+      maxKeyValueSize);
+    this.maxKeyValueSize = maxKeyValueSize;
+    return this;
+  }
+
+  @Override
   public AsyncBufferedMutator build() {
     return new AsyncBufferedMutatorImpl(periodicalFlushTimer, 
tableBuilder.build(), writeBufferSize,
-      periodicFlushTimeoutNs);
+      periodicFlushTimeoutNs, maxKeyValueSize);
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 61d49af..7aa9597 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
@@ -49,6 +50,8 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
 
   private final long periodicFlushTimeoutNs;
 
+  private final int maxKeyValueSize;
+
   private List<Mutation> mutations = new ArrayList<>();
 
   private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -61,11 +64,12 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
   Timeout periodicFlushTask;
 
   AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
-      long writeBufferSize, long periodicFlushTimeoutNs) {
+      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
     this.periodicalFlushTimer = periodicalFlushTimer;
     this.table = table;
     this.writeBufferSize = writeBufferSize;
     this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
+    this.maxKeyValueSize = maxKeyValueSize;
   }
 
   @Override
@@ -112,7 +116,13 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
     List<CompletableFuture<Void>> futures =
       Stream.<CompletableFuture<Void>> 
generate(CompletableFuture::new).limit(mutations.size())
         .collect(Collectors.toList());
-    long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
+    long heapSize = 0;
+    for (Mutation mutation : mutations) {
+      heapSize += mutation.heapSize();
+      if (mutation instanceof Put) {
+        validatePut((Put)mutation, maxKeyValueSize);
+      }
+    }
     synchronized (this) {
       if (closed) {
         IOException ioe = new IOException("Already closed");
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 65542e4..22042c9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -40,6 +40,8 @@ import static 
org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
 import static 
org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
@@ -106,6 +108,8 @@ class AsyncConnectionConfiguration {
 
   private final long primaryMetaScanTimeoutNs;
 
+  private final int maxKeyValueSize;
+
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 
DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -142,6 +146,7 @@ class AsyncConnectionConfiguration {
     this.primaryMetaScanTimeoutNs =
       
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
         HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
+    this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, 
MAX_KEYVALUE_SIZE_DEFAULT);
   }
 
   long getMetaOperationTimeoutNs() {
@@ -211,4 +216,8 @@ class AsyncConnectionConfiguration {
   long getPrimaryMetaScanTimeoutNs() {
     return primaryMetaScanTimeoutNs;
   }
+
+  int getMaxKeyValueSize() {
+    return maxKeyValueSize;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index d4bc811..f0c8da4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -188,7 +188,7 @@ public class BufferedMutatorImpl implements BufferedMutator 
{
     int toAddCount = 0;
     for (Mutation m : ms) {
       if (m instanceof Put) {
-        HTable.validatePut((Put) m, maxKeyValueSize);
+        ConnectionUtils.validatePut((Put) m, maxKeyValueSize);
       }
       toAddSize += m.heapSize();
       ++toAddCount;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 8e050df..3b6560f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -572,4 +572,20 @@ public final class ConnectionUtils {
       });
     return future;
   }
+
+  // validate for well-formedness
+  static void validatePut(Put put, int maxKeyValueSize) throws 
IllegalArgumentException {
+    if (put.isEmpty()) {
+      throw new IllegalArgumentException("No columns to insert");
+    }
+    if (maxKeyValueSize > 0) {
+      for (List<Cell> list : put.getFamilyCellMap().values()) {
+        for (Cell cell : list) {
+          if (cell.getSerializedSize() > maxKeyValueSize) {
+            throw new IllegalArgumentException("KeyValue size too large");
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 15a189c..9b3afd9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -27,7 +27,6 @@ import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -920,24 +919,8 @@ public class HTable implements Table {
   }
 
   // validate for well-formedness
-  public void validatePut(final Put put) throws IllegalArgumentException {
-    validatePut(put, connConfiguration.getMaxKeyValueSize());
-  }
-
-  // validate for well-formedness
-  public static void validatePut(Put put, int maxKeyValueSize) throws 
IllegalArgumentException {
-    if (put.isEmpty()) {
-      throw new IllegalArgumentException("No columns to insert");
-    }
-    if (maxKeyValueSize > 0) {
-      for (List<Cell> list : put.getFamilyCellMap().values()) {
-        for (Cell cell : list) {
-          if (cell.getSerializedSize() > maxKeyValueSize) {
-            throw new IllegalArgumentException("KeyValue size too large");
-          }
-        }
-      }
-    }
+  private void validatePut(final Put put) throws IllegalArgumentException {
+    ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
   }
 
   /**
@@ -1261,6 +1244,7 @@ public class HTable implements Table {
 
     @Override
     public boolean thenPut(Put put) throws IOException {
+      validatePut(put);
       preCheck();
       return doCheckAndPut(row, family, qualifier, op.name(), value, 
timeRange, put);
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index c2edb89..56c551f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -195,7 +195,7 @@ public class HTableMultiplexer {
     }
 
     try {
-      HTable.validatePut(put, maxKeyValueSize);
+      ConnectionUtils.validatePut(put, maxKeyValueSize);
       // Allow mocking to get at the connection, but don't expose the 
connection to users.
       ClusterConnection conn = (ClusterConnection) getConnection();
       // AsyncProcess in the FlushWorker should take care of refreshing the 
location cache
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 7562e6f..96fa85d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.RpcChannel;
@@ -235,6 +236,7 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
   @Override
   public CompletableFuture<Void> put(Put put) {
+    validatePut(put, conn.connConf.getMaxKeyValueSize());
     return this.<Void> newCaller(put, writeRpcTimeoutNs)
       .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> 
voidMutate(controller, loc, stub,
         put, RequestConverter::buildMutateRequest))
@@ -326,6 +328,7 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
     @Override
     public CompletableFuture<Boolean> thenPut(Put put) {
+      validatePut(put, conn.connConf.getMaxKeyValueSize());
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> 
mutate(controller, loc,
@@ -478,6 +481,9 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
   @Override
   public List<CompletableFuture<Void>> put(List<Put> puts) {
+    for (Put put : puts) {
+      validatePut(put, conn.connConf.getMaxKeyValueSize());
+    }
     return voidMutate(puts);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 6eed326..5e7f6cc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -245,8 +245,8 @@ public class TestAsyncBufferMutator {
     private int flushCount;
 
     AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
-        long writeBufferSize, long periodicFlushTimeoutNs) {
-      super(periodicalFlushTimer, table, writeBufferSize, 
periodicFlushTimeoutNs);
+        long writeBufferSize, long periodicFlushTimeoutNs, int 
maxKeyValueSize) {
+      super(periodicalFlushTimer, table, writeBufferSize, 
periodicFlushTimeoutNs, maxKeyValueSize);
     }
 
     @Override
@@ -262,7 +262,7 @@ public class TestAsyncBufferMutator {
     Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
     try (AsyncBufferMutatorForTest mutator =
       new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, 
CONN.getTable(TABLE_NAME),
-        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
+        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) 
{
       CompletableFuture<?> future = mutator.mutate(put);
       Timeout task = mutator.periodicFlushTask;
       // we should have scheduled a periodic flush task

Reply via email to