[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418402390 ## File path: hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java ## @@ -329,47 +332,140 @@ public void testPutBatchSizeCalculation() { // All asserts cases below are derived out of the first // example below, with change in one parameter at a time. -int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f); -// Expected batchSize is 8 because in that case, total request sent in one second is below -// 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000 -// We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request -// 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected. -Assert.assertEquals(putBatchSize, 8); +int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.1f); +// Total puts that can be sent in 1 second = (10 * 16667 * 0.1) = 16,667 +// Total puts per batch will be (16,667 / parallelism) = 83.335, where 200 is the maxExecutors +Assert.assertEquals(putBatchSize, 83); // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved -int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f); -Assert.assertEquals(putBatchSize2, 4); +int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 0.1f); +Assert.assertEquals(putBatchSize2, 41); // If the parallelism is halved, batchSize has to double -int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f); -Assert.assertEquals(putBatchSize3, 16); +int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 0.1f); +Assert.assertEquals(putBatchSize3, 166); // If the parallelism is halved, batchSize has to double. // This time parallelism is driven by numTasks rather than numExecutors -int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f); -Assert.assertEquals(putBatchSize4, 16); +int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 0.1f); +Assert.assertEquals(putBatchSize4, 166); // If sleepTimeMs is halved, batchSize has to halve -int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f); -Assert.assertEquals(putBatchSize5, 4); +int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.05f); +Assert.assertEquals(putBatchSize5, 41); // If maxQPSPerRegionServer is doubled, batchSize also doubles -int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 4, 1200, 200, 100, 0.1f); -Assert.assertEquals(putBatchSize6, 16); +int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 4, 1200, 200, 0.1f); +Assert.assertEquals(putBatchSize6, 166); } @Test public void testsHBasePutAccessParallelism() { HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); final JavaRDD writeStatusRDD = jsc.parallelize( -Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10); +Arrays.asList( + getSampleWriteStatus(0, 2), + getSampleWriteStatus(2, 3), + getSampleWriteStatus(4, 3), + getSampleWriteStatus(6, 3), + getSampleWriteStatus(8, 0)), +10); final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD); final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); final int hbaseNumPuts = Integer.parseInt(tuple._1.toString()); Assert.assertEquals(10, writeStatusRDD.getNumPartitions()); -Assert.assertEquals(2, hbasePutAccessParallelism); -Assert.assertEquals(11, hbaseNumPuts); +Assert.assertEquals(4, hbasePutAccessParallelism); +Assert.assertEquals(20, hbaseNumPuts); + } + + @Test + public void testsWriteStatusPartitioner() { +HoodieWriteConfig config = getConfig(); +HBaseIndex index = new HBaseIndex(config); +int parallelism = 4; +final JavaRDD writeStatusRDD = jsc.parallelize( +Arrays.asList( + getSampleWriteStatusWithFileId(0, 2), + getSampleWriteStatusWithFileId(2, 3), + getSampleWriteStatusWithFileId(4, 3), + getSampleWriteStatusWithFileId(0, 3), + getSampleWriteStatusWithFileId(11, 0)), parallelism); +int partitionIndex = 0; +final Map fileIdPartitionMap = new HashMap<>(); + +final List fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) Review comment: Sure, I have moved this logic inside HBaseIndex to a separate method instead of a different class, let me know. Using this util method
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418397300 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -252,8 +263,10 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm }; } - private Result[] doGet(HTable hTable, List keys) throws IOException { -sleepForTime(SLEEP_TIME_MILLISECONDS); + private Result[] doGet(HTable hTable, List keys, RateLimiter limiter) throws IOException { +if (keys.size() > 0) { Review comment: Invoking hTable.get on empty keys is returning an empty Result array. But anyway, I changed it to explicitly return empty array now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418397300 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -252,8 +263,10 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm }; } - private Result[] doGet(HTable hTable, List keys) throws IOException { -sleepForTime(SLEEP_TIME_MILLISECONDS); + private Result[] doGet(HTable hTable, List keys, RateLimiter limiter) throws IOException { +if (keys.size() > 0) { Review comment: Invoking hTable.get on empty keys is returning an empty Result array. But anyway, I changed it to explicitly return empty array now. Though I don't want to change the return type of doGet(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391855 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm /** * Helper method to facilitate performing mutations (including puts and deletes) in Hbase. */ - private void doMutations(BufferedMutator mutator, List mutations) throws IOException { + private void doMutations(BufferedMutator mutator, List mutations, RateLimiter limiter) throws IOException { if (mutations.isEmpty()) { return; } +// report number of operations to account per second with rate limiter. +// If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls +// for within that second +limiter.acquire(mutations.size()); mutator.mutate(mutations); mutator.flush(); mutations.clear(); -sleepForTime(SLEEP_TIME_MILLISECONDS); - } - - private static void sleepForTime(int sleepTimeMs) { -try { - Thread.sleep(sleepTimeMs); -} catch (InterruptedException e) { - LOG.error("Sleep interrupted during throttling", e); - throw new RuntimeException(e); -} } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { -final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); -setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); -LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); -JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); +final Option desiredQPSFraction = calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator); +// Map each fileId that has inserts to a unique partition Id. This will be used while +// repartitioning RDD +int partitionIndex = 0; +final List fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) + .map(w -> w.getFileId()).collect(); +for (final String fileId : fileIds) { + this.fileIdPartitionMap.put(fileId, partitionIndex++); +} +JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD : + writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) +.partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap, + this.numWriteStatusWithInserts)) +.map(w -> w._2()); Review comment: When this.numWriteStatusWithInserts == 0, fileIds will be empty. So fileIdPartitionMap will also be empty in this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391562 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm /** * Helper method to facilitate performing mutations (including puts and deletes) in Hbase. */ - private void doMutations(BufferedMutator mutator, List mutations) throws IOException { + private void doMutations(BufferedMutator mutator, List mutations, RateLimiter limiter) throws IOException { if (mutations.isEmpty()) { return; } +// report number of operations to account per second with rate limiter. +// If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls +// for within that second +limiter.acquire(mutations.size()); mutator.mutate(mutations); mutator.flush(); mutations.clear(); -sleepForTime(SLEEP_TIME_MILLISECONDS); - } - - private static void sleepForTime(int sleepTimeMs) { -try { - Thread.sleep(sleepTimeMs); -} catch (InterruptedException e) { - LOG.error("Sleep interrupted during throttling", e); - throw new RuntimeException(e); -} } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { -final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); -setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); -LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); -JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); +final Option desiredQPSFraction = calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator); +// Map each fileId that has inserts to a unique partition Id. This will be used while +// repartitioning RDD +int partitionIndex = 0; +final List fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) + .map(w -> w.getFileId()).collect(); +for (final String fileId : fileIds) { + this.fileIdPartitionMap.put(fileId, partitionIndex++); +} +JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD : + writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) +.partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap, + this.numWriteStatusWithInserts)) +.map(w -> w._2()); +acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc); +LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize); Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391204 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -498,4 +554,37 @@ public boolean isImplicitWithStorage() { public void setHbaseConnection(Connection hbaseConnection) { HBaseIndex.hbaseConnection = hbaseConnection; } + + /** + * Partitions each WriteStatus with inserts into a unique single partition. WriteStatus without inserts will be + * assigned to random partitions. This partitioner will be useful to utilize max parallelism with spark operations + * that are based on inserts in each WriteStatus. + */ + public class WriteStatusPartitioner extends Partitioner { Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391072 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -83,13 +88,17 @@ private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts"); private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name"); private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); - private static final int SLEEP_TIME_MILLISECONDS = 100; private static final Logger LOG = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; Review comment: Synced offline to go over use-case outside of hoodie and why we need to rate limit here. Just to summarize, we need to rate limit here because the actual hbase operations are handled here. And HBaseIndexQPSResourceAllocator#acquireQPSResources is mostly meant to manage metadata like checking for available resources before an operation, releasing meta resources etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418390619 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm /** * Helper method to facilitate performing mutations (including puts and deletes) in Hbase. */ - private void doMutations(BufferedMutator mutator, List mutations) throws IOException { + private void doMutations(BufferedMutator mutator, List mutations, RateLimiter limiter) throws IOException { if (mutations.isEmpty()) { return; } +// report number of operations to account per second with rate limiter. +// If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls +// for within that second +limiter.acquire(mutations.size()); mutator.mutate(mutations); mutator.flush(); mutations.clear(); -sleepForTime(SLEEP_TIME_MILLISECONDS); - } - - private static void sleepForTime(int sleepTimeMs) { -try { - Thread.sleep(sleepTimeMs); -} catch (InterruptedException e) { - LOG.error("Sleep interrupted during throttling", e); - throw new RuntimeException(e); -} } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { -final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); -setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); -LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); -JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); +final Option desiredQPSFraction = calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator); +// Map each fileId that has inserts to a unique partition Id. This will be used while +// repartitioning RDD +int partitionIndex = 0; +final List fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) + .map(w -> w.getFileId()).collect(); +for (final String fileId : fileIds) { + this.fileIdPartitionMap.put(fileId, partitionIndex++); +} +JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD : + writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) +.partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap, + this.numWriteStatusWithInserts)) +.map(w -> w._2()); +acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc); +LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize); +JavaRDD writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(), +true); // caching the index updated status RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); +// force trigger update location(hbase puts) +writeStatusJavaRDD.count(); +this.hBaseIndexQPSResourceAllocator.releaseQPSResources(); return writeStatusJavaRDD; } - private void setPutBatchSize(JavaRDD writeStatusRDD, - HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, final JavaSparkContext jsc) { + private Option calculateQPSFraction(JavaRDD writeStatusRDD, + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator) { Review comment: Yeah I think it makes sense to refactor it here. Let me update other parts of this class where hBaseIndexQPSResourceAllocator is passed around. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418389802 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm /** * Helper method to facilitate performing mutations (including puts and deletes) in Hbase. */ - private void doMutations(BufferedMutator mutator, List mutations) throws IOException { + private void doMutations(BufferedMutator mutator, List mutations, RateLimiter limiter) throws IOException { if (mutations.isEmpty()) { return; } +// report number of operations to account per second with rate limiter. +// If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls +// for within that second +limiter.acquire(mutations.size()); mutator.mutate(mutations); mutator.flush(); mutations.clear(); -sleepForTime(SLEEP_TIME_MILLISECONDS); - } - - private static void sleepForTime(int sleepTimeMs) { -try { - Thread.sleep(sleepTimeMs); -} catch (InterruptedException e) { - LOG.error("Sleep interrupted during throttling", e); - throw new RuntimeException(e); -} } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { -final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); -setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); -LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); -JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); +final Option desiredQPSFraction = calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator); +// Map each fileId that has inserts to a unique partition Id. This will be used while +// repartitioning RDD +int partitionIndex = 0; +final List fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) + .map(w -> w.getFileId()).collect(); +for (final String fileId : fileIds) { + this.fileIdPartitionMap.put(fileId, partitionIndex++); +} +JavaRDD partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD : + writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) +.partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap, + this.numWriteStatusWithInserts)) +.map(w -> w._2()); +acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc); +LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize); +JavaRDD writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(), +true); // caching the index updated status RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); +// force trigger update location(hbase puts) +writeStatusJavaRDD.count(); +this.hBaseIndexQPSResourceAllocator.releaseQPSResources(); Review comment: Correct, releaseQPSResources() has implementation outside of hoodie. This ensures that we release any resources acquired for HBase operations are released right after the spark stage is done (i.e. because of forced spark action) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418387652 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -83,13 +88,17 @@ private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts"); private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name"); private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); - private static final int SLEEP_TIME_MILLISECONDS = 100; private static final Logger LOG = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; private float qpsFraction; private int maxQpsPerRegionServer; + private int maxPutsPerSec; Review comment: Yeah can be removed actually, I am anyway recalculating this inside getBatchSize(). Also cleaned up other variables that are not used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
v3nkatesh commented on a change in pull request #1484: URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418387192 ## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ## @@ -83,13 +88,17 @@ private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts"); private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name"); private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); - private static final int SLEEP_TIME_MILLISECONDS = 100; private static final Logger LOG = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; private float qpsFraction; private int maxQpsPerRegionServer; + private int maxPutsPerSec; + private long totalNumInserts; + private int numWriteStatusWithInserts; + Map fileIdPartitionMap = new HashMap<>(); Review comment: Yeah we don't need a class variable for this. Moved it inside updateLocation() method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (HUDI-856) Supporting multiple insert/upsert operations before commit()
Nishith Agarwal created HUDI-856: Summary: Supporting multiple insert/upsert operations before commit() Key: HUDI-856 URL: https://issues.apache.org/jira/browse/HUDI-856 Project: Apache Hudi (incubating) Issue Type: Bug Components: Common Core Reporter: Nishith Agarwal The HoodieWriteClient api's are designed such that we can have the following operations : 1) startCommit() 2) perform operations (upsert, insert) 3) commit() In the past, Hudi used to be able to support multiple operations for insert, upsert before commiting. With MOR inflight data (for rollbacks) and the new state machine for timeline metadata this does not work. Opening this ticket to discuss what's the best suggested way of using these APIs [~varadarb] [~vinoth] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [incubator-hudi] tooptoop4 opened a new issue #1579: [SUPPORT] https://github.com/YotpoLtd/metorikku/issues/290
tooptoop4 opened a new issue #1579: URL: https://github.com/apache/incubator-hudi/issues/1579 pls help with https://github.com/YotpoLtd/metorikku/issues/290 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (HUDI-852) Add validation to check Table name when Append Mode is used in DataSource writer
[ https://issues.apache.org/jira/browse/HUDI-852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavani Sudha reassigned HUDI-852: -- Assignee: (was: Bhavani Sudha) > Add validation to check Table name when Append Mode is used in DataSource > writer > > > Key: HUDI-852 > URL: https://issues.apache.org/jira/browse/HUDI-852 > Project: Apache Hudi (incubating) > Issue Type: Bug > Components: newbie, Writer Core >Reporter: Bhavani Sudha >Priority: Minor > Fix For: 0.6.0 > > > Copied from user's description in mailing list: > Table name is not respected while inserting record with different table name > with Append mode > > {code:java} > // While running commands from Hudi quick start guide, I found that the > library does not check for the table name in the request against the table > name in the metadata available in the base path, I think it should throw > TableAlreadyExist, In case of Save mode: *overwrite *it warns. > *spark-2.4.4-bin-hadoop2.7/bin/spark-shell --packages > org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 > --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'* > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > * | option(TABLE_NAME, "test_table").* > | mode(*Append*). > | save(basePath) > 20/04/29 17:23:42 WARN DefaultSource: Snapshot view not supported yet via > data source, for MERGE_ON_READ tables. Please query the Hive table > registered using Spark SQL. > scala> > No exception is thrown if we run this > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > * | option(TABLE_NAME, "foo_table").* > | mode(*Append*). > | save(basePath) > 20/04/29 17:24:37 WARN DefaultSource: Snapshot view not supported yet via > data source, for MERGE_ON_READ tables. Please query the Hive table > registered using Spark SQL. > scala> > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > | option(TABLE_NAME, *tableName*). > | mode(*Overwrite*). > | save(basePath) > *20/04/29 22:25:16 WARN HoodieSparkSqlWriter$: hoodie table at > file:/tmp/hudi_trips_cow already exists. Deleting existing data & > overwriting with new data.* > 20/04/29 22:25:18 WARN DefaultSource: Snapshot view not supported yet via > data source, for MERGE_ON_READ tables. Please query the Hive table > registered using Spark SQL. > scala> > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-852) Add validation to check Table name when Append Mode is used in DataSource writer
[ https://issues.apache.org/jira/browse/HUDI-852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavani Sudha updated HUDI-852: --- Status: Open (was: New) > Add validation to check Table name when Append Mode is used in DataSource > writer > > > Key: HUDI-852 > URL: https://issues.apache.org/jira/browse/HUDI-852 > Project: Apache Hudi (incubating) > Issue Type: Bug > Components: newbie, Writer Core >Reporter: Bhavani Sudha >Assignee: Bhavani Sudha >Priority: Minor > Fix For: 0.6.0 > > > Copied from user's description in mailing list: > Table name is not respected while inserting record with different table name > with Append mode > > {code:java} > // While running commands from Hudi quick start guide, I found that the > library does not check for the table name in the request against the table > name in the metadata available in the base path, I think it should throw > TableAlreadyExist, In case of Save mode: *overwrite *it warns. > *spark-2.4.4-bin-hadoop2.7/bin/spark-shell --packages > org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 > --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'* > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > * | option(TABLE_NAME, "test_table").* > | mode(*Append*). > | save(basePath) > 20/04/29 17:23:42 WARN DefaultSource: Snapshot view not supported yet via > data source, for MERGE_ON_READ tables. Please query the Hive table > registered using Spark SQL. > scala> > No exception is thrown if we run this > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > * | option(TABLE_NAME, "foo_table").* > | mode(*Append*). > | save(basePath) > 20/04/29 17:24:37 WARN DefaultSource: Snapshot view not supported yet via > data source, for MERGE_ON_READ tables. Please query the Hive table > registered using Spark SQL. > scala> > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > | option(TABLE_NAME, *tableName*). > | mode(*Overwrite*). > | save(basePath) > *20/04/29 22:25:16 WARN HoodieSparkSqlWriter$: hoodie table at > file:/tmp/hudi_trips_cow already exists. Deleting existing data & > overwriting with new data.* > 20/04/29 22:25:18 WARN DefaultSource: Snapshot view not supported yet via > data source, for MERGE_ON_READ tables. Please query the Hive table > registered using Spark SQL. > scala> > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [incubator-hudi] codecov-io commented on pull request #1578: Add Hudi changes for Presto MOR query support
codecov-io commented on pull request #1578: URL: https://github.com/apache/incubator-hudi/pull/1578#issuecomment-622103750 # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=h1) Report > Merging [#1578](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/c4b71622b90fc66f20f361d4c083b0a396572b75=desc) will **increase** coverage by `0.03%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1578/graphs/tree.svg?width=650=150=pr=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#1578 +/- ## + Coverage 71.81% 71.85% +0.03% Complexity 294 294 Files 385 385 Lines 1654216542 Branches 1661 1661 + Hits 1188011886 +6 + Misses 3932 3925 -7 - Partials730 731 +1 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/incubator-hudi/pull/1578/diff?src=pr=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | `72.34% <ø> (ø)` | `0.00 <0.00> (ø)` | | | [...src/main/java/org/apache/hudi/metrics/Metrics.java](https://codecov.io/gh/apache/incubator-hudi/pull/1578/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0cmljcy9NZXRyaWNzLmphdmE=) | `67.56% <0.00%> (+10.81%)` | `0.00% <0.00%> (ø%)` | | | [...g/apache/hudi/metrics/InMemoryMetricsReporter.java](https://codecov.io/gh/apache/incubator-hudi/pull/1578/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0cmljcy9Jbk1lbW9yeU1ldHJpY3NSZXBvcnRlci5qYXZh) | `80.00% <0.00%> (+40.00%)` | `0.00% <0.00%> (ø%)` | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=footer). Last update [c4b7162...c8b4297](https://codecov.io/gh/apache/incubator-hudi/pull/1578?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] bvaradar commented on pull request #1576: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode
bvaradar commented on pull request #1576: URL: https://github.com/apache/incubator-hudi/pull/1576#issuecomment-622084509 @vinothchandar : Ready for review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] bschell opened a new pull request #1578: Add Hudi changes for Presto MOR query support
bschell opened a new pull request #1578: URL: https://github.com/apache/incubator-hudi/pull/1578 Adds the necessary changes to hudi for support of presto querying hudi merge-on-read table's realtime view. To see the associated follow-up changes in Presto, please see this branch. https://github.com/bschell/presto/commit/a3fb658c1cd70fd72f0a3021b3d994fe383303aa ## Brief change log - *Add @UseRecordReaderFromInputFormat annotation for HoodieParquetRealtimeInputFormat* ## Verify this pull request Tested with associated Presto patch to successfully query hudi MOR realtime tables. hudi-hadoop-mr tests pass This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] codecov-io commented on pull request #1576: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode
codecov-io commented on pull request #1576: URL: https://github.com/apache/incubator-hudi/pull/1576#issuecomment-622069471 # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=h1) Report > Merging [#1576](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/c4b71622b90fc66f20f361d4c083b0a396572b75=desc) will **decrease** coverage by `0.00%`. > The diff coverage is `80.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1576/graphs/tree.svg?width=650=150=pr=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#1576 +/- ## - Coverage 71.81% 71.81% -0.01% Complexity 294 294 Files 385 385 Lines 1654216549 +7 Branches 1661 1661 + Hits 1188011885 +5 + Misses 3932 3931 -1 - Partials730 733 +3 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...g/apache/hudi/table/action/clean/CleanPlanner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1576/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NsZWFuL0NsZWFuUGxhbm5lci5qYXZh) | `86.13% <80.00%> (-1.57%)` | `0.00 <0.00> (ø)` | | | [...le/action/rollback/BaseRollbackActionExecutor.java](https://codecov.io/gh/apache/incubator-hudi/pull/1576/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL3JvbGxiYWNrL0Jhc2VSb2xsYmFja0FjdGlvbkV4ZWN1dG9yLmphdmE=) | `70.83% <0.00%> (-6.95%)` | `0.00% <0.00%> (ø%)` | | | [...n/java/org/apache/hudi/index/hbase/HBaseIndex.java](https://codecov.io/gh/apache/incubator-hudi/pull/1576/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvaGJhc2UvSEJhc2VJbmRleC5qYXZh) | `82.77% <0.00%> (-0.48%)` | `0.00% <0.00%> (ø%)` | | | [...able/action/restore/BaseRestoreActionExecutor.java](https://codecov.io/gh/apache/incubator-hudi/pull/1576/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL3Jlc3RvcmUvQmFzZVJlc3RvcmVBY3Rpb25FeGVjdXRvci5qYXZh) | `93.75% <0.00%> (+3.12%)` | `0.00% <0.00%> (ø%)` | | | [...src/main/java/org/apache/hudi/metrics/Metrics.java](https://codecov.io/gh/apache/incubator-hudi/pull/1576/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0cmljcy9NZXRyaWNzLmphdmE=) | `67.56% <0.00%> (+10.81%)` | `0.00% <0.00%> (ø%)` | | | [...g/apache/hudi/metrics/InMemoryMetricsReporter.java](https://codecov.io/gh/apache/incubator-hudi/pull/1576/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0cmljcy9Jbk1lbW9yeU1ldHJpY3NSZXBvcnRlci5qYXZh) | `80.00% <0.00%> (+40.00%)` | `0.00% <0.00%> (ø%)` | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=footer). Last update [c4b7162...44ee5e7](https://codecov.io/gh/apache/incubator-hudi/pull/1576?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] bvaradar commented on pull request #1576: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode
bvaradar commented on pull request #1576: URL: https://github.com/apache/incubator-hudi/pull/1576#issuecomment-622020739 Let me resolve the conflicts and ping back This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #1576: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode
bvaradar commented on a change in pull request #1576: URL: https://github.com/apache/incubator-hudi/pull/1576#discussion_r418199354 ## File path: hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java ## @@ -128,10 +128,8 @@ private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, Hood assertFalse(table.getCompletedCommitsTimeline().empty()); String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); +// We no longer write empty cleaner plans when there are not enough commits present Review comment: Yes, this particular check is when inserting first batch of data. There are other checks present which validates the effect of valid clean operations ## File path: hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ## @@ -111,10 +111,15 @@ public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { * @throws IOException when underlying file-system throws this exception */ public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { -if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() + +if (!newInstantToRetain.isPresent() && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { Review comment: Done ## File path: hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ## @@ -111,10 +111,15 @@ public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { * @throws IOException when underlying file-system throws this exception */ public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { -if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() + +if (!newInstantToRetain.isPresent() && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { + LOG.info("No earliest commit to retain. No need to scan partitions !!"); + return new ArrayList<>(); Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[incubator-hudi] branch master updated: [MINOR] Reorder HoodieTimeline#compareTimestamp arguments for better readability (#1575)
This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git The following commit(s) were added to refs/heads/master by this push: new c4b7162 [MINOR] Reorder HoodieTimeline#compareTimestamp arguments for better readability (#1575) c4b7162 is described below commit c4b71622b90fc66f20f361d4c083b0a396572b75 Author: vinoth chandar AuthorDate: Thu Apr 30 09:19:39 2020 -0700 [MINOR] Reorder HoodieTimeline#compareTimestamp arguments for better readability (#1575) - reads nicely as (instantTime1, GREATER_THAN_OR_EQUALS, instantTime2) etc --- .../org/apache/hudi/cli/commands/CommitsCommand.java | 2 +- .../apache/hudi/cli/commands/FileSystemViewCommand.java | 4 ++-- .../org/apache/hudi/cli/commands/HoodieSyncCommand.java | 2 +- .../java/org/apache/hudi/client/HoodieWriteClient.java | 4 ++-- .../java/org/apache/hudi/index/hbase/HBaseIndex.java | 4 ++-- .../org/apache/hudi/table/HoodieTimelineArchiveLog.java | 6 +++--- .../org/apache/hudi/table/action/clean/CleanPlanner.java | 16 +--- .../action/compact/ScheduleCompactionActionExecutor.java | 4 ++-- .../table/action/restore/BaseRestoreActionExecutor.java | 2 +- .../rollback/MergeOnReadRollbackActionExecutor.java | 4 ++-- .../table/action/savepoint/SavepointActionExecutor.java | 2 +- .../src/test/java/org/apache/hudi/table/TestCleaner.java | 2 +- .../java/org/apache/hudi/table/TestMergeOnReadTable.java | 4 ++-- .../org/apache/hudi/common/model/HoodieFileGroup.java| 6 +++--- .../common/table/log/AbstractHoodieLogRecordScanner.java | 4 ++-- .../hudi/common/table/timeline/HoodieActiveTimeline.java | 2 +- .../common/table/timeline/HoodieDefaultTimeline.java | 6 +++--- .../hudi/common/table/timeline/HoodieTimeline.java | 16 .../hudi/common/table/timeline/TimelineDiffHelper.java | 2 +- .../common/table/view/AbstractTableFileSystemView.java | 6 +++--- .../common/table/view/RocksDbBasedFileSystemView.java| 6 +++--- .../common/table/view/TestIncrementalFSViewSync.java | 6 +++--- .../org/apache/hudi/utilities/HoodieSnapshotCopier.java | 4 ++-- .../apache/hudi/utilities/HoodieSnapshotExporter.java| 4 ++-- .../hudi/utilities/sources/helpers/IncrSourceHelper.java | 4 ++-- .../apache/hudi/utilities/TestHoodieDeltaStreamer.java | 4 ++-- 26 files changed, 64 insertions(+), 62 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 9c63d29..4288c2a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -357,7 +357,7 @@ public class CommitsCommand implements CommandMarker { sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp(); if (sourceLatestCommit != null -&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) { +&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { // source is behind the target List commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE) .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index cf86184..2e32515 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -260,9 +260,9 @@ public class FileSystemViewCommand implements CommandMarker { if (!maxInstant.isEmpty()) { final BiPredicate predicate; if (includeMaxInstant) { -predicate = HoodieTimeline.GREATER_OR_EQUAL; +predicate = HoodieTimeline.GREATER_THAN_OR_EQUALS; } else { -predicate = HoodieTimeline.GREATER; +predicate = HoodieTimeline.GREATER_THAN; } instantsStream = instantsStream.filter(is -> predicate.test(maxInstant, is.getTimestamp())); } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java index c77a924..e0ceb2e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java @@ -79,7 +79,7 @@ public class HoodieSyncCommand implements CommandMarker { sourceTimeline.getInstants().iterator().hasNext() ? "0" :
[jira] [Commented] (HUDI-853) Deprecate/Remove Clean_by_versions functionality in Hudi
[ https://issues.apache.org/jira/browse/HUDI-853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096660#comment-17096660 ] Vinoth Chandar commented on HUDI-853: - There were couple people on the ML , who wanted to keep this IIRC :/ Might be worth punting it down the line, unless you feel strongly.. > Deprecate/Remove Clean_by_versions functionality in Hudi > - > > Key: HUDI-853 > URL: https://issues.apache.org/jira/browse/HUDI-853 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Cleaner >Reporter: Balaji Varadarajan >Priority: Major > Fix For: 0.6.0 > > > Cleaner by version is not used and it also does not lend itself well with > incremental cleaning. > Can we go ahead and deprecate it in 0.6.0 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1577: [WIP] [HUDI-855] Run Auto Cleaner in parallel with ingestion
vinothchandar commented on a change in pull request #1577: URL: https://github.com/apache/incubator-hudi/pull/1577#discussion_r418092315 ## File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java ## @@ -339,8 +352,13 @@ protected void postCommit(HoodieCommitMetadata metadata, String instantTime, archiveLog.archiveIfRequired(jsc); if (config.isAutoClean()) { // Call clean to cleanup if there is anything to cleanup after the commit, -LOG.info("Auto cleaning is enabled. Running cleaner now"); -clean(instantTime); +if (config.isRunParallelAutoClean()) { Review comment: let's move this into its own method for readability? ## File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java ## @@ -677,4 +714,27 @@ private void rollbackPendingCommits() { }); return compactionInstantTimeOpt; } + + /** + * Auto Clean service running concurrently. + */ + private static class AutoCleanerService extends AbstractAsyncService { Review comment: lets move this to its own class? ## File path: hudi-common/src/main/java/org/apache/hudi/common/async/AbstractAsyncService.java ## @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.utilities.deltastreamer; +package org.apache.hudi.common.async; Review comment: does this really belong in hudi-common or it can stay in hudi-client (execution package?) ## File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java ## @@ -677,4 +714,27 @@ private void rollbackPendingCommits() { }); return compactionInstantTimeOpt; } + + /** + * Auto Clean service running concurrently. + */ + private static class AutoCleanerService extends AbstractAsyncService { + +private final HoodieWriteClient writeClient; +private final String cleanInstant; + +private AutoCleanerService(HoodieWriteClient writeClient, String cleanInstant) { + this.writeClient = writeClient; + this.cleanInstant = cleanInstant; +} + +@Override +protected Pair startService() { + ExecutorService executor = Executors.newFixedThreadPool(1); Review comment: move to constructor and reuse for the duration of a HoodieWriteClient ? ## File path: hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ## @@ -111,10 +111,15 @@ public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { * @throws IOException when underlying file-system throws this exception */ public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { -if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() + +if (!newInstantToRetain.isPresent() && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { Review comment: this is from the other PR. ## File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java ## @@ -677,4 +714,27 @@ private void rollbackPendingCommits() { }); return compactionInstantTimeOpt; } + + /** + * Auto Clean service running concurrently. + */ + private static class AutoCleanerService extends AbstractAsyncService { Review comment: And even the `spawnAutoCleanerIfEnabled` and `waitForAutoCleanerToShutdown` as static helpers in this class.. (trying to avoid code creep inside HoodieWriteClient :)) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1576: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode
vinothchandar commented on a change in pull request #1576: URL: https://github.com/apache/incubator-hudi/pull/1576#discussion_r418090411 ## File path: hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java ## @@ -128,10 +128,8 @@ private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, Hood assertFalse(table.getCompletedCommitsTimeline().empty()); String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); +// We no longer write empty cleaner plans when there are not enough commits present Review comment: do we have enough tests that now test when a valid cleaner plan? ## File path: hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ## @@ -111,10 +111,15 @@ public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { * @throws IOException when underlying file-system throws this exception */ public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { -if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() + +if (!newInstantToRetain.isPresent() && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { + LOG.info("No earliest commit to retain. No need to scan partitions !!"); + return new ArrayList<>(); Review comment: Collections.emptyList()? ## File path: hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ## @@ -111,10 +111,15 @@ public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { * @throws IOException when underlying file-system throws this exception */ public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { -if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() + +if (!newInstantToRetain.isPresent() && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { Review comment: can we break this method up into two, based on the policy? it feels very overloaded now.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] reste85 edited a comment on issue #143: Tracking ticket for folks to be added to slack group
reste85 edited a comment on issue #143: URL: https://github.com/apache/incubator-hudi/issues/143#issuecomment-621917992 please add: teore...@gmail.com This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] reste85 commented on issue #143: Tracking ticket for folks to be added to slack group
reste85 commented on issue #143: URL: https://github.com/apache/incubator-hudi/issues/143#issuecomment-621917992 please add: mreste...@cuebiq.com This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] codecov-io commented on pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync
codecov-io commented on pull request #1559: URL: https://github.com/apache/incubator-hudi/pull/1559#issuecomment-621759888 # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=h1) Report > Merging [#1559](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/06dae30297ea02ab122c9029a54f7927e8212039=desc) will **decrease** coverage by `0.08%`. > The diff coverage is `53.84%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1559/graphs/tree.svg?width=650=150=pr=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#1559 +/- ## - Coverage 71.85% 71.76% -0.09% Complexity 294 294 Files 385 385 Lines 1654016562 +22 Branches 1661 1666 +5 + Hits 1188411886 +2 - Misses 3925 3943 +18 - Partials731 733 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java](https://codecov.io/gh/apache/incubator-hudi/pull/1559/diff?src=pr=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvVXRpbHMuamF2YQ==) | `79.83% <0.00%> (-4.99%)` | `0.00 <0.00> (ø)` | | | [.../apache/hudi/common/table/TableSchemaResolver.java](https://codecov.io/gh/apache/incubator-hudi/pull/1559/diff?src=pr=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL1RhYmxlU2NoZW1hUmVzb2x2ZXIuamF2YQ==) | `56.71% <70.58%> (-5.47%)` | `0.00 <0.00> (ø)` | | | [...c/main/java/org/apache/hudi/table/HoodieTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1559/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllVGFibGUuamF2YQ==) | `83.59% <100.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...in/java/org/apache/hudi/hive/HoodieHiveClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1559/diff?src=pr=tree#diff-aHVkaS1oaXZlLXN5bmMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGl2ZS9Ib29kaWVIaXZlQ2xpZW50LmphdmE=) | `74.63% <100.00%> (ø)` | `0.00 <0.00> (ø)` | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=footer). Last update [06dae30...bbf94c0](https://codecov.io/gh/apache/incubator-hudi/pull/1559?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] pratyakshsharma commented on issue #528: Does hudi support composite primary key upserts?
pratyakshsharma commented on issue #528: URL: https://github.com/apache/incubator-hudi/issues/528#issuecomment-621758691 > you can build a composite key by specifying a list of fields.. cc @pratyakshsharma as FYI.. this seems like something we should doc more along with the your new CustomKeyGenerator :).. Naming being simple and docs to find this will go a long way for our users ? Definitely, let me address the set of comments on that PR and I can quickly start the documentation work as well. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-855) Run Auto Cleaner in parallel with ingestion
[ https://issues.apache.org/jira/browse/HUDI-855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated HUDI-855: Labels: pull-request-available (was: ) > Run Auto Cleaner in parallel with ingestion > --- > > Key: HUDI-855 > URL: https://issues.apache.org/jira/browse/HUDI-855 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Cleaner >Reporter: Balaji Varadarajan >Priority: Major > Labels: pull-request-available > Fix For: 0.6.0 > > > Currently auto clean is run synchronously after ingestion is finished. As > Cleaning and ingestion can safely happen in parallel, we can take advantage > and schedule them to run in parallel. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [incubator-hudi] bvaradar opened a new pull request #1577: [WIP] [HUDI-855] Run Auto Cleaner in parallel with ingestion
bvaradar opened a new pull request #1577: URL: https://github.com/apache/incubator-hudi/pull/1577 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-855) Run Auto Cleaner in parallel with ingestion
[ https://issues.apache.org/jira/browse/HUDI-855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Balaji Varadarajan updated HUDI-855: Status: Open (was: New) > Run Auto Cleaner in parallel with ingestion > --- > > Key: HUDI-855 > URL: https://issues.apache.org/jira/browse/HUDI-855 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Cleaner >Reporter: Balaji Varadarajan >Priority: Major > Fix For: 0.6.0 > > > Currently auto clean is run synchronously after ingestion is finished. As > Cleaning and ingestion can safely happen in parallel, we can take advantage > and schedule them to run in parallel. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[incubator-hudi] branch hudi_test_suite_refactor updated (7db66af -> 33590b7)
This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a change to branch hudi_test_suite_refactor in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git. from 7db66af [HUDI-394] Provide a basic implementation of test suite add 33590b7 [MINOR] Code cleanup for DeltaConfig No new revisions were added by this update. Summary of changes: .../hudi/testsuite/configuration/DeltaConfig.java | 23 +++--- 1 file changed, 11 insertions(+), 12 deletions(-)
[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1100: [HUDI-289] Implement a test suite to support long running test for Hudi writing and querying end-end
yanghua commented on a change in pull request #1100: URL: https://github.com/apache/incubator-hudi/pull/1100#discussion_r417860719 ## File path: hudi-test-suite/src/main/java/org/apache/hudi/testsuite/DFSDeltaWriterAdapter.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.testsuite; + +import org.apache.hudi.testsuite.writer.FileDeltaInputWriter; +import org.apache.hudi.testsuite.writer.WriteStats; + +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * {@link org.apache.hadoop.hdfs.DistributedFileSystem} (or {@link org.apache.hadoop.fs.LocalFileSystem}) based delta + * generator. + */ +public class DFSDeltaWriterAdapter implements DeltaWriterAdapter { + + private FileDeltaInputWriter deltaInputGenerator; + private List metrics = new ArrayList<>(); + + public DFSDeltaWriterAdapter(FileDeltaInputWriter deltaInputGenerator) { +this.deltaInputGenerator = deltaInputGenerator; + } + + @Override + public List write(Iterator input) throws IOException { +deltaInputGenerator.open(); +while (input.hasNext()) { + if (this.deltaInputGenerator.canWrite()) { +this.deltaInputGenerator.writeData(input.next()); + } else if (input.hasNext()) { +rollOver(); + } +} +close(); +return this.metrics; + } + + public void rollOver() throws IOException { +close(); Review comment: We can let `DeltaInputWriter` implement `AutoClosable`? So that we can use `try-with-resource` and stop calling many `close` methods? ## File path: hudi-test-suite/src/main/java/org/apache/hudi/testsuite/DeltaWriterFactory.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.testsuite; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.testsuite.configuration.DeltaConfig; +import org.apache.hudi.testsuite.writer.AvroDeltaInputWriter; +import org.apache.hudi.testsuite.writer.FileDeltaInputWriter; + +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; + +/** + * A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputType} and + * {@link DeltaInputFormat}. + */ +public class DeltaWriterFactory { + + private DeltaWriterFactory() { + } + + public static DeltaWriterAdapter getDeltaWriterAdapter(DeltaConfig config, Integer batchId) throws IOException { +switch (config.getDeltaOutputType()) { + case DFS: +switch (config.getDeltaInputFormat()) { + case AVRO: +DFSDeltaConfig dfsDeltaConfig = (DFSDeltaConfig) config; +dfsDeltaConfig.setBatchId(batchId); +FileDeltaInputWriter fileDeltaInputGenerator = new AvroDeltaInputWriter( +dfsDeltaConfig.getConfiguration(), +StringUtils +.join(new String[]{dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()}, +"/"), dfsDeltaConfig.getSchemaStr(), dfsDeltaConfig.getMaxFileSize()); +DFSDeltaWriterAdapter workloadSink = new DFSDeltaWriterAdapter(fileDeltaInputGenerator); Review comment: just `return new
[jira] [Commented] (HUDI-538) Restructuring hudi client module for multi engine support
[ https://issues.apache.org/jira/browse/HUDI-538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096380#comment-17096380 ] Hong Shen commented on HUDI-538: [~yanghua] please @ me if need. > Restructuring hudi client module for multi engine support > - > > Key: HUDI-538 > URL: https://issues.apache.org/jira/browse/HUDI-538 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Code Cleanup >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Fix For: 0.6.0 > > > Hudi is currently tightly coupled with the Spark framework. It caused the > integration with other computing engine more difficult. We plan to decouple > it with Spark. This umbrella issue used to track this work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-854) Incremental Cleaning should not revert to brute force all-partition scanning in any cases
[ https://issues.apache.org/jira/browse/HUDI-854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Balaji Varadarajan updated HUDI-854: Status: Open (was: New) > Incremental Cleaning should not revert to brute force all-partition scanning > in any cases > - > > Key: HUDI-854 > URL: https://issues.apache.org/jira/browse/HUDI-854 > Project: Apache Hudi (incubating) > Issue Type: New Feature > Components: Cleaner >Reporter: Balaji Varadarajan >Priority: Major > Fix For: 0.6.0 > > > After [https://github.com/apache/incubator-hudi/pull/1576] . Incremental > Cleaning would still resort to full partition scan when no previous clean > operation was done in the dataset. This ticket is to design and implement a > safe solution which would avoid full scanning in all cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-854) Incremental Cleaning should not revert to brute force all-partition scanning in any cases
Balaji Varadarajan created HUDI-854: --- Summary: Incremental Cleaning should not revert to brute force all-partition scanning in any cases Key: HUDI-854 URL: https://issues.apache.org/jira/browse/HUDI-854 Project: Apache Hudi (incubating) Issue Type: New Feature Components: Cleaner Reporter: Balaji Varadarajan Fix For: 0.6.0 After [https://github.com/apache/incubator-hudi/pull/1576] . Incremental Cleaning would still resort to full partition scan when no previous clean operation was done in the dataset. This ticket is to design and implement a safe solution which would avoid full scanning in all cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (HUDI-853) Deprecate/Remove Clean_by_versions functionality in Hudi
[ https://issues.apache.org/jira/browse/HUDI-853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096343#comment-17096343 ] Balaji Varadarajan commented on HUDI-853: - [~vinoth] : Let me know your thoughts on this ? > Deprecate/Remove Clean_by_versions functionality in Hudi > - > > Key: HUDI-853 > URL: https://issues.apache.org/jira/browse/HUDI-853 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Cleaner >Reporter: Balaji Varadarajan >Priority: Major > Fix For: 0.6.0 > > > Cleaner by version is not used and it also does not lend itself well with > incremental cleaning. > Can we go ahead and deprecate it in 0.6.0 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-853) Deprecate/Remove Clean_by_versions functionality in Hudi
Balaji Varadarajan created HUDI-853: --- Summary: Deprecate/Remove Clean_by_versions functionality in Hudi Key: HUDI-853 URL: https://issues.apache.org/jira/browse/HUDI-853 Project: Apache Hudi (incubating) Issue Type: Improvement Components: Cleaner Reporter: Balaji Varadarajan Fix For: 0.6.0 Cleaner by version is not used and it also does not lend itself well with incremental cleaning. Can we go ahead and deprecate it in 0.6.0 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-852) Add validation to check Table name when Append Mode is used in DataSource writer
Bhavani Sudha created HUDI-852: -- Summary: Add validation to check Table name when Append Mode is used in DataSource writer Key: HUDI-852 URL: https://issues.apache.org/jira/browse/HUDI-852 Project: Apache Hudi (incubating) Issue Type: Bug Components: newbie, Writer Core Reporter: Bhavani Sudha Assignee: Bhavani Sudha Fix For: 0.6.0 Copied from user's description in mailing list: Table name is not respected while inserting record with different table name with Append mode {code:java} // While running commands from Hudi quick start guide, I found that the library does not check for the table name in the request against the table name in the metadata available in the base path, I think it should throw TableAlreadyExist, In case of Save mode: *overwrite *it warns. *spark-2.4.4-bin-hadoop2.7/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'* scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). * | option(TABLE_NAME, "test_table").* | mode(*Append*). | save(basePath) 20/04/29 17:23:42 WARN DefaultSource: Snapshot view not supported yet via data source, for MERGE_ON_READ tables. Please query the Hive table registered using Spark SQL. scala> No exception is thrown if we run this scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). * | option(TABLE_NAME, "foo_table").* | mode(*Append*). | save(basePath) 20/04/29 17:24:37 WARN DefaultSource: Snapshot view not supported yet via data source, for MERGE_ON_READ tables. Please query the Hive table registered using Spark SQL. scala> scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, *tableName*). | mode(*Overwrite*). | save(basePath) *20/04/29 22:25:16 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data.* 20/04/29 22:25:18 WARN DefaultSource: Snapshot view not supported yet via data source, for MERGE_ON_READ tables. Please query the Hive table registered using Spark SQL. scala> {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-850) Avoid unnecessary listings in incremental cleaning mode
[ https://issues.apache.org/jira/browse/HUDI-850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated HUDI-850: Labels: pull-request-available (was: ) > Avoid unnecessary listings in incremental cleaning mode > --- > > Key: HUDI-850 > URL: https://issues.apache.org/jira/browse/HUDI-850 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Cleaner, Performance >Reporter: Vinoth Chandar >Assignee: Balaji Varadarajan >Priority: Major > Labels: pull-request-available > Fix For: 0.6.0 > > > Came up during https://github.com/apache/incubator-hudi/issues/1552 > Even with incremental cleaning turned on, we would have a scenario where > there are no commits yet to clean, but we end up listing needlessly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [incubator-hudi] bvaradar opened a new pull request #1576: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode
bvaradar opened a new pull request #1576: URL: https://github.com/apache/incubator-hudi/pull/1576 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] codecov-io edited a comment on pull request #1409: [HUDI-714]Add javadoc and comments to hudi write method link
codecov-io edited a comment on pull request #1409: URL: https://github.com/apache/incubator-hudi/pull/1409#issuecomment-599323873 # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=h1) Report > Merging [#1409](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/9059bce977cee98e2d65769622c46a1941c563dd=desc) will **increase** coverage by `0.03%`. > The diff coverage is `75.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1409/graphs/tree.svg?width=650=150=pr=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#1409 +/- ## + Coverage 71.81% 71.85% +0.03% Complexity 294 294 Files 385 385 Lines 1654016541 +1 Branches 1661 1659 -2 + Hits 1187811885 +7 + Misses 3932 3925 -7 - Partials730 731 +1 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [.../apache/hudi/client/AbstractHoodieWriteClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L0Fic3RyYWN0SG9vZGllV3JpdGVDbGllbnQuamF2YQ==) | `77.35% <ø> (ø)` | `0.00 <0.00> (ø)` | | | [...in/java/org/apache/hudi/table/WorkloadProfile.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvV29ya2xvYWRQcm9maWxlLmphdmE=) | `100.00% <ø> (ø)` | `0.00 <0.00> (ø)` | | | [...n/java/org/apache/hudi/common/model/HoodieKey.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUtleS5qYXZh) | `88.88% <ø> (ø)` | `0.00 <0.00> (ø)` | | | [...pache/hudi/common/table/HoodieTableMetaClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlTWV0YUNsaWVudC5qYXZh) | `83.11% <ø> (ø)` | `0.00 <0.00> (ø)` | | | [...src/main/java/org/apache/hudi/DataSourceUtils.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1zcGFyay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9EYXRhU291cmNlVXRpbHMuamF2YQ==) | `56.70% <ø> (ø)` | `0.00 <0.00> (ø)` | | | [...i/utilities/deltastreamer/HoodieDeltaStreamer.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lci5qYXZh) | `80.00% <0.00%> (ø)` | `11.00 <0.00> (ø)` | | | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `72.58% <75.00%> (+0.13%)` | `37.00 <0.00> (ø)` | | | [...s/deltastreamer/HoodieMultiTableDeltaStreamer.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllTXVsdGlUYWJsZURlbHRhU3RyZWFtZXIuamF2YQ==) | `78.39% <100.00%> (ø)` | `18.00 <0.00> (ø)` | | | [...src/main/java/org/apache/hudi/metrics/Metrics.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0cmljcy9NZXRyaWNzLmphdmE=) | `67.56% <0.00%> (+10.81%)` | `0.00% <0.00%> (ø%)` | | | [...g/apache/hudi/metrics/InMemoryMetricsReporter.java](https://codecov.io/gh/apache/incubator-hudi/pull/1409/diff?src=pr=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvbWV0cmljcy9Jbk1lbW9yeU1ldHJpY3NSZXBvcnRlci5qYXZh) | `80.00% <0.00%> (+40.00%)` | `0.00% <0.00%> (ø%)` | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=footer). Last update [9059bce...1c07619](https://codecov.io/gh/apache/incubator-hudi/pull/1409?src=pr=lastupdated). Read the [comment
[jira] [Assigned] (HUDI-851) Add Documentation on partitioning data with examples and details on how to sync to Hive
[ https://issues.apache.org/jira/browse/HUDI-851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavani Sudha reassigned HUDI-851: -- Assignee: Bhavani Sudha (was: vinoyang) > Add Documentation on partitioning data with examples and details on how to > sync to Hive > --- > > Key: HUDI-851 > URL: https://issues.apache.org/jira/browse/HUDI-851 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Docs, docs-chinese >Reporter: Bhavani Sudha >Assignee: Bhavani Sudha >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-851) Add Documentation on partitioning data with examples and details on how to sync to Hive
Bhavani Sudha created HUDI-851: -- Summary: Add Documentation on partitioning data with examples and details on how to sync to Hive Key: HUDI-851 URL: https://issues.apache.org/jira/browse/HUDI-851 Project: Apache Hudi (incubating) Issue Type: Improvement Components: Docs, docs-chinese Reporter: Bhavani Sudha Assignee: vinoyang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [incubator-hudi] vinothchandar commented on issue #528: Does hudi support composite primary key upserts?
vinothchandar commented on issue #528: URL: https://github.com/apache/incubator-hudi/issues/528#issuecomment-621645502 @Murthy-Palla you can configure your key generator and using the `ComplexKeyGenerator` https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java you can build a composite key by specifying a list of fields.. cc @pratyakshsharma as FYI.. this seems like something we should doc more along with the your new CustomKeyGenerator :).. Naming being simple and docs to find this will go a long way for our users ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-hudi] vinothchandar opened a new pull request #1575: [MINOR] Reorder HoodieTimeline#compareTimestamp arguments for better readability
vinothchandar opened a new pull request #1575: URL: https://github.com/apache/incubator-hudi/pull/1575 - reads nicely as (instantTime1, GREATER_OR_EQUAL, instantTime2) etc ## *Tips* - *Thank you very much for contributing to Apache Hudi.* - *Please review https://hudi.apache.org/contributing.html before opening a pull request.* ## What is the purpose of the pull request *(For example: This pull request adds quick-start document.)* ## Brief change log *(for example:)* - *Modify AnnotationLocation checkstyle rule in checkstyle.xml* ## Verify this pull request *(Please pick either of the following options)* This pull request is a trivial rework / code cleanup without any test coverage. *(or)* This pull request is already covered by existing tests, such as *(please describe tests)*. (or) This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end.* - *Added HoodieClientWriteTest to verify the change.* - *Manually verified the change by running a job locally.* ## Committer checklist - [ ] Has a corresponding JIRA in PR title & commit - [ ] Commit message is descriptive of the change - [ ] CI is green - [ ] Necessary doc changes done or have another open PR - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (HUDI-850) Avoid unnecessary listings in incremental cleaning mode
[ https://issues.apache.org/jira/browse/HUDI-850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Balaji Varadarajan reassigned HUDI-850: --- Assignee: Balaji Varadarajan (was: Vinoth Chandar) > Avoid unnecessary listings in incremental cleaning mode > --- > > Key: HUDI-850 > URL: https://issues.apache.org/jira/browse/HUDI-850 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Cleaner, Performance >Reporter: Vinoth Chandar >Assignee: Balaji Varadarajan >Priority: Major > Fix For: 0.6.0 > > > Came up during https://github.com/apache/incubator-hudi/issues/1552 > Even with incremental cleaning turned on, we would have a scenario where > there are no commits yet to clean, but we end up listing needlessly -- This message was sent by Atlassian Jira (v8.3.4#803005)