[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-05-01 Thread GitBox


satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418696889



##
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
* Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
*/
-  private void doMutations(BufferedMutator mutator, List mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List mutations, 
RateLimiter limiter) throws IOException {
 if (mutations.isEmpty()) {
   return;
 }
+// report number of operations to account per second with rate limiter.
+// If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+// for within that second
+limiter.acquire(mutations.size());
 mutator.mutate(mutations);
 mutator.flush();
 mutations.clear();

Review comment:
   example to help clarify what i meant:
   
   lets say, mutator.mutate() + flush +clear takes 2 seconds as minimum. 
limiter.acquire would never wait because it generates mutations.size() tokens 
every second. So we would never wait. looks like this is expected and we dont 
see it as a problem. So I'm fine with it. (if possible, having metrics on per 
operation wait time would help us debug any potential issues)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-05-01 Thread GitBox


satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418692391



##
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##
@@ -83,13 +88,14 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = 
Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
-  private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private long totalNumInserts;
+  private int numWriteStatusWithInserts;

Review comment:
   these two also seem like related to the operation being performed and 
not really need to be instance variables. If we can find a way to move them to 
local variables, that would make it cleaner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-05-01 Thread GitBox


satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418690538



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/*
+ * Note: Based on RateLimiter implementation in Google/Guava.
+ * - adopted from com.google.common.util.concurrent
+ *   Copyright (C) 2012 The Guava Authors
+ *   Home page: https://github.com/google/guava
+ *   License: http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+@ThreadSafe
+public abstract class RateLimiter {
+  private final RateLimiter.SleepingTicker ticker;
+  private final long offsetNanos;
+  double storedPermits;
+  double maxPermits;
+  volatile double stableIntervalMicros;
+  private final Object mutex;
+  private long nextFreeTicketMicros;
+
+  public static RateLimiter create(double permitsPerSecond) {
+return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double 
permitsPerSecond) {
+RateLimiter rateLimiter = new RateLimiter.Bursty(ticker, 1.0D);
+rateLimiter.setRate(permitsPerSecond);
+return rateLimiter;
+  }
+
+  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, 
TimeUnit unit) {
+return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond, 
warmupPeriod, unit);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double 
permitsPerSecond, long warmupPeriod, TimeUnit unit) {
+RateLimiter rateLimiter = new RateLimiter.WarmingUp(ticker, warmupPeriod, 
unit);
+rateLimiter.setRate(permitsPerSecond);
+return rateLimiter;
+  }
+
+  private RateLimiter(RateLimiter.SleepingTicker ticker) {
+this.mutex = new Object();
+this.nextFreeTicketMicros = 0L;
+this.ticker = ticker;
+this.offsetNanos = ticker.read();
+  }
+
+  public final void setRate(double permitsPerSecond) {
+checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), 
"rate must be positive");
+Object var3 = this.mutex;
+synchronized (this.mutex) {
+  this.resync(this.readSafeMicros());
+  double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / 
permitsPerSecond;
+  this.stableIntervalMicros = stableIntervalMicros;
+  this.doSetRate(permitsPerSecond, stableIntervalMicros);
+}
+  }
+
+  abstract void doSetRate(double var1, double var3);
+
+  public final double getRate() {
+return (double)TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
+  }
+
+  public void acquire() {
+this.acquire(1);
+  }
+
+  public void acquire(int permits) {

Review comment:
   any chance we can add these as metrics/logs? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-20 Thread GitBox


satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r411749174



##
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
* Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
*/
-  private void doMutations(BufferedMutator mutator, List mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List mutations, 
RateLimiter limiter) throws IOException {
 if (mutations.isEmpty()) {
   return;
 }
+// report number of operations to account per second with rate limiter.
+// If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+// for within that second
+limiter.acquire(mutations.size());
 mutator.mutate(mutations);
 mutator.flush();
 mutations.clear();

Review comment:
   another question, what is the typical latency of these mutate 
operations? If time taken here combined with time taken to collect 
'multiPutBatchSize' is > 1 second, then it seems like limiter would generate 
enough tokens for next run and would not wait at all. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407905954
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
* Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
*/
-  private void doMutations(BufferedMutator mutator, List mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List mutations, 
RateLimiter limiter) throws IOException {
 if (mutations.isEmpty()) {
   return;
 }
+// report number of operations to account per second with rate limiter.
+// If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+// for within that second
+limiter.acquire(mutations.size());
 mutator.mutate(mutations);
 mutator.flush();
 mutations.clear();
-sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-try {
-  Thread.sleep(sleepTimeMs);
-} catch (InterruptedException e) {
-  LOG.error("Sleep interrupted during throttling", e);
-  throw new RuntimeException(e);
-}
   }
 
   @Override
   public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
   HoodieTable hoodieTable) {
-final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-JavaRDD writeStatusJavaRDD = 
writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+final Option desiredQPSFraction =  
calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+// Map each fileId that has inserts to a unique partition Id. This will be 
used while
+// repartitioning RDD
+int partitionIndex = 0;
+final List fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
+   .map(w -> w.getFileId()).collect();
+for (final String fileId : fileIds) {
+  this.fileIdPartitionMap.put(fileId, partitionIndex++);
+}
+JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 
? writeStatusRDD :
+  writeStatusRDD.mapToPair(w -> new 
Tuple2<>(w.getFileId(), w))
+.partitionBy(new 
WriteStatusPartitioner(this.fileIdPartitionMap,
+  this.numWriteStatusWithInserts))
+.map(w -> w._2());
 
 Review comment:
   Consider redoing this logic, because if this.numWriteStatusWithInserts == 0 
, we still go through the process of generating fileIdPartitionMap which is not 
ideal.
   
   Also, curious, if you did any performance measurements before and after this 
change. It is worth highlighting in release notes if this improvement is 
significant


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407869565
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
* Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
*/
-  private void doMutations(BufferedMutator mutator, List mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List mutations, 
RateLimiter limiter) throws IOException {
 if (mutations.isEmpty()) {
   return;
 }
+// report number of operations to account per second with rate limiter.
+// If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+// for within that second
+limiter.acquire(mutations.size());
 mutator.mutate(mutations);
 mutator.flush();
 mutations.clear();
-sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-try {
-  Thread.sleep(sleepTimeMs);
-} catch (InterruptedException e) {
-  LOG.error("Sleep interrupted during throttling", e);
-  throw new RuntimeException(e);
-}
   }
 
   @Override
   public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
   HoodieTable hoodieTable) {
-final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-JavaRDD writeStatusJavaRDD = 
writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+final Option desiredQPSFraction =  
calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+// Map each fileId that has inserts to a unique partition Id. This will be 
used while
+// repartitioning RDD
+int partitionIndex = 0;
+final List fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
+   .map(w -> w.getFileId()).collect();
+for (final String fileId : fileIds) {
+  this.fileIdPartitionMap.put(fileId, partitionIndex++);
+}
+JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 
? writeStatusRDD :
+  writeStatusRDD.mapToPair(w -> new 
Tuple2<>(w.getFileId(), w))
+.partitionBy(new 
WriteStatusPartitioner(this.fileIdPartitionMap,
+  this.numWriteStatusWithInserts))
+.map(w -> w._2());
+acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
 
 Review comment:
   nit: looks like this is logged in the above method call too. so i think this 
can be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407805506
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
* Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
*/
-  private void doMutations(BufferedMutator mutator, List mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List mutations, 
RateLimiter limiter) throws IOException {
 if (mutations.isEmpty()) {
   return;
 }
+// report number of operations to account per second with rate limiter.
+// If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+// for within that second
+limiter.acquire(mutations.size());
 mutator.mutate(mutations);
 mutator.flush();
 mutations.clear();
-sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-try {
-  Thread.sleep(sleepTimeMs);
-} catch (InterruptedException e) {
-  LOG.error("Sleep interrupted during throttling", e);
-  throw new RuntimeException(e);
-}
   }
 
   @Override
   public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
   HoodieTable hoodieTable) {
-final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-JavaRDD writeStatusJavaRDD = 
writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+final Option desiredQPSFraction =  
calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+// Map each fileId that has inserts to a unique partition Id. This will be 
used while
+// repartitioning RDD
+int partitionIndex = 0;
+final List fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
+   .map(w -> w.getFileId()).collect();
+for (final String fileId : fileIds) {
+  this.fileIdPartitionMap.put(fileId, partitionIndex++);
+}
+JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 
? writeStatusRDD :
+  writeStatusRDD.mapToPair(w -> new 
Tuple2<>(w.getFileId(), w))
+.partitionBy(new 
WriteStatusPartitioner(this.fileIdPartitionMap,
+  this.numWriteStatusWithInserts))
+.map(w -> w._2());
+acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+JavaRDD writeStatusJavaRDD = 
partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+true);
 // caching the index updated status RDD
 writeStatusJavaRDD = 
writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+// force trigger update location(hbase puts)
+writeStatusJavaRDD.count();
+this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
 
 Review comment:
   Can you help me understand this code? why do we need to force trigger here? 
Is this just to releaseQPSResources? releaseQPSResources seems to be doing 
nothing (at least default implementation, are there other implementations 
outside hoodie?). Is it really important to release here as opposed to doing it 
in 'close()' (earlier behavior)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r408345007
 
 

 ##
 File path: hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
 ##
 @@ -329,47 +332,140 @@ public void testPutBatchSizeCalculation() {
 // All asserts cases below are derived out of the first
 // example below, with change in one parameter at a time.
 
-int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
100, 0.1f);
-// Expected batchSize is 8 because in that case, total request sent in one 
second is below
-// 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 
(numRegionServers) * 0.1 (qpsFraction)) => 16000
-// We assume requests get distributed to Region Servers uniformly, so each 
RS gets 1600 request
-// 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
-Assert.assertEquals(putBatchSize, 8);
+int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
0.1f);
+// Total puts that can be sent  in 1 second = (10 * 16667 * 0.1) = 16,667
+// Total puts per batch will be (16,667 / parallelism) = 83.335, where 200 
is the maxExecutors
+Assert.assertEquals(putBatchSize, 83);
 
 // Number of Region Servers are halved, total requests sent in a second 
are also halved, so batchSize is also halved
-int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 
100, 0.1f);
-Assert.assertEquals(putBatchSize2, 4);
+int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 
0.1f);
+Assert.assertEquals(putBatchSize2, 41);
 
 // If the parallelism is halved, batchSize has to double
-int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 
100, 0.1f);
-Assert.assertEquals(putBatchSize3, 16);
+int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 
0.1f);
+Assert.assertEquals(putBatchSize3, 166);
 
 // If the parallelism is halved, batchSize has to double.
 // This time parallelism is driven by numTasks rather than numExecutors
-int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 
100, 0.1f);
-Assert.assertEquals(putBatchSize4, 16);
+int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 
0.1f);
+Assert.assertEquals(putBatchSize4, 166);
 
 // If sleepTimeMs is halved, batchSize has to halve
-int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
100, 0.05f);
-Assert.assertEquals(putBatchSize5, 4);
+int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
0.05f);
+Assert.assertEquals(putBatchSize5, 41);
 
 // If maxQPSPerRegionServer is doubled, batchSize also doubles
-int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 4, 1200, 200, 
100, 0.1f);
-Assert.assertEquals(putBatchSize6, 16);
+int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 4, 1200, 200, 
0.1f);
+Assert.assertEquals(putBatchSize6, 166);
   }
 
   @Test
   public void testsHBasePutAccessParallelism() {
 HoodieWriteConfig config = getConfig();
 HBaseIndex index = new HBaseIndex(config);
 final JavaRDD writeStatusRDD = jsc.parallelize(
-Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), 
getSampleWriteStatus(10, 0)), 10);
+Arrays.asList(
+  getSampleWriteStatus(0, 2),
+  getSampleWriteStatus(2, 3),
+  getSampleWriteStatus(4, 3),
+  getSampleWriteStatus(6, 3),
+  getSampleWriteStatus(8, 0)),
+10);
 final Tuple2 tuple = 
index.getHBasePutAccessParallelism(writeStatusRDD);
 final int hbasePutAccessParallelism = 
Integer.parseInt(tuple._2.toString());
 final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
 Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
-Assert.assertEquals(2, hbasePutAccessParallelism);
-Assert.assertEquals(11, hbaseNumPuts);
+Assert.assertEquals(4, hbasePutAccessParallelism);
+Assert.assertEquals(20, hbaseNumPuts);
+  }
+
+  @Test
+  public void testsWriteStatusPartitioner() {
+HoodieWriteConfig config = getConfig();
+HBaseIndex index = new HBaseIndex(config);
+int parallelism = 4;
+final JavaRDD writeStatusRDD = jsc.parallelize(
+Arrays.asList(
+  getSampleWriteStatusWithFileId(0, 2),
+  getSampleWriteStatusWithFileId(2, 3),
+  getSampleWriteStatusWithFileId(4, 3),
+  getSampleWriteStatusWithFileId(0, 3),
+  getSampleWriteStatusWithFileId(11, 0)), parallelism);
+int partitionIndex = 0;
+final Map fileIdPartitionMap = new HashMap<>();
+
+final List fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
 
 Review comment:
   lot of code in this test seems like repetition from source code. consider 

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407807929
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
* Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
*/
-  private void doMutations(BufferedMutator mutator, List mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List mutations, 
RateLimiter limiter) throws IOException {
 if (mutations.isEmpty()) {
   return;
 }
+// report number of operations to account per second with rate limiter.
+// If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+// for within that second
+limiter.acquire(mutations.size());
 mutator.mutate(mutations);
 mutator.flush();
 mutations.clear();
-sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-try {
-  Thread.sleep(sleepTimeMs);
-} catch (InterruptedException e) {
-  LOG.error("Sleep interrupted during throttling", e);
-  throw new RuntimeException(e);
-}
   }
 
   @Override
   public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
   HoodieTable hoodieTable) {
-final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-JavaRDD writeStatusJavaRDD = 
writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+final Option desiredQPSFraction =  
calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+// Map each fileId that has inserts to a unique partition Id. This will be 
used while
+// repartitioning RDD
+int partitionIndex = 0;
+final List fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
+   .map(w -> w.getFileId()).collect();
+for (final String fileId : fileIds) {
+  this.fileIdPartitionMap.put(fileId, partitionIndex++);
+}
+JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 
? writeStatusRDD :
+  writeStatusRDD.mapToPair(w -> new 
Tuple2<>(w.getFileId(), w))
+.partitionBy(new 
WriteStatusPartitioner(this.fileIdPartitionMap,
+  this.numWriteStatusWithInserts))
+.map(w -> w._2());
+acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+JavaRDD writeStatusJavaRDD = 
partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+true);
 // caching the index updated status RDD
 writeStatusJavaRDD = 
writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+// force trigger update location(hbase puts)
+writeStatusJavaRDD.count();
+this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
 return writeStatusJavaRDD;
   }
 
-  private void setPutBatchSize(JavaRDD writeStatusRDD,
-  HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, final 
JavaSparkContext jsc) {
+  private Option calculateQPSFraction(JavaRDD 
writeStatusRDD,
+   HBaseIndexQPSResourceAllocator 
hBaseIndexQPSResourceAllocator) {
 
 Review comment:
   Not directly related to your change, so feel free to ignore this comment. 
But hBaseIndexQPSResourceAllocator  is instance variable. why is this again 
passed as argument. This seems like a consistent pattern in this class. Because 
we are also using exact same name for local variable, it masks instance 
variable and can become easily error prone if the two variables evolve to mean 
different things.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407868223
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -498,4 +554,37 @@ public boolean isImplicitWithStorage() {
   public void setHbaseConnection(Connection hbaseConnection) {
 HBaseIndex.hbaseConnection = hbaseConnection;
   }
+
+  /**
+   * Partitions each WriteStatus with inserts into a unique single partition. 
WriteStatus without inserts will be
+   * assigned to random partitions. This partitioner will be useful to utilize 
max parallelism with spark operations
+   * that are based on inserts in each WriteStatus.
+   */
+  public class WriteStatusPartitioner extends Partitioner {
 
 Review comment:
   Could you please make this a static class if its not using any instance 
variables of outer class


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407766436
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = 
Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private int maxPutsPerSec;
 
 Review comment:
   I dont see how this is used. could you please add a comment for all these 
instance variables? It seems like they can be local variables specific to the 
operation being performed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407812837
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = 
Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
 
 Review comment:
   Can you share the context on why we created HBaseIndexQPSResourceAllocator?  
Do you think calls to RateLimiter#acquire can be made inside 
HBaseIndexQPSResourceAllocator#acquireQPSResources to simplify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r408328994
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##
 @@ -252,8 +263,10 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
 };
   }
 
-  private Result[] doGet(HTable hTable, List keys) throws IOException {
-sleepForTime(SLEEP_TIME_MILLISECONDS);
+  private Result[] doGet(HTable hTable, List keys, RateLimiter limiter) 
throws IOException {
+if (keys.size() > 0) {
 
 Review comment:
   nit: can we also move hTable.get(keys) inside this if?  do we need to invoke 
hTable.get if keys is empty?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

2020-04-14 Thread GitBox
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407815204
 
 

 ##
 File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/*
+ * Note: Based on RateLimiter implementation in Google/Guava.
+ * - adopted from com.google.common.util.concurrent
+ *   Copyright (C) 2012 The Guava Authors
+ *   Home page: https://github.com/google/guava
+ *   License: http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+@ThreadSafe
+public abstract class RateLimiter {
+  private final RateLimiter.SleepingTicker ticker;
+  private final long offsetNanos;
+  double storedPermits;
+  double maxPermits;
+  volatile double stableIntervalMicros;
+  private final Object mutex;
+  private long nextFreeTicketMicros;
+
+  public static RateLimiter create(double permitsPerSecond) {
+return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double 
permitsPerSecond) {
+RateLimiter rateLimiter = new RateLimiter.Bursty(ticker, 1.0D);
+rateLimiter.setRate(permitsPerSecond);
+return rateLimiter;
+  }
+
+  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, 
TimeUnit unit) {
+return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond, 
warmupPeriod, unit);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double 
permitsPerSecond, long warmupPeriod, TimeUnit unit) {
+RateLimiter rateLimiter = new RateLimiter.WarmingUp(ticker, warmupPeriod, 
unit);
+rateLimiter.setRate(permitsPerSecond);
+return rateLimiter;
+  }
+
+  private RateLimiter(RateLimiter.SleepingTicker ticker) {
+this.mutex = new Object();
+this.nextFreeTicketMicros = 0L;
+this.ticker = ticker;
+this.offsetNanos = ticker.read();
+  }
+
+  public final void setRate(double permitsPerSecond) {
+checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), 
"rate must be positive");
+Object var3 = this.mutex;
+synchronized (this.mutex) {
+  this.resync(this.readSafeMicros());
+  double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / 
permitsPerSecond;
+  this.stableIntervalMicros = stableIntervalMicros;
+  this.doSetRate(permitsPerSecond, stableIntervalMicros);
+}
+  }
+
+  abstract void doSetRate(double var1, double var3);
+
+  public final double getRate() {
+return (double)TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
+  }
+
+  public void acquire() {
+this.acquire(1);
+  }
+
+  public void acquire(int permits) {
 
 Review comment:
   could you return the time spent waiting here? I think adding metrics on time 
taken is very important for debugging any potential performance issues. Also, 
would be useful to log if time taken is greater than some threshold (say, 
300ms?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services