[jira] [Closed] (HUDI-814) Migrate hudi-client tests to JUnit 5

2020-04-28 Thread vinoyang (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed HUDI-814.
-
Resolution: Done

Done via master branch: 69b16309c8c46f831c8b9be42de8b2e29c74f03e

> Migrate hudi-client tests to JUnit 5
> 
>
> Key: HUDI-814
> URL: https://issues.apache.org/jira/browse/HUDI-814
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[incubator-hudi] branch master updated: [HUDI-814] Migrate hudi-client tests to JUnit 5 (#1570)

2020-04-28 Thread vinoyang
This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 69b1630  [HUDI-814] Migrate hudi-client tests to JUnit 5 (#1570)
69b1630 is described below

commit 69b16309c8c46f831c8b9be42de8b2e29c74f03e
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue Apr 28 22:57:28 2020 -0700

[HUDI-814] Migrate hudi-client tests to JUnit 5 (#1570)
---
 .../hudi/common/config/TestHoodieWriteConfig.java  |  8 ++--
 .../bloom/TestBucketizedBloomCheckPartitioner.java | 24 +-
 .../hudi/index/bloom/TestKeyRangeLookupTree.java   |  4 +-
 .../strategy/TestHoodieCompactionStrategy.java | 54 +++---
 4 files changed, 44 insertions(+), 46 deletions(-)

diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/config/TestHoodieWriteConfig.java
 
b/hudi-client/src/test/java/org/apache/hudi/common/config/TestHoodieWriteConfig.java
index 3516a6a..a1904a5 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/config/TestHoodieWriteConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.HoodieWriteConfig.Builder;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -32,7 +32,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestHoodieWriteConfig {
 
@@ -52,8 +52,8 @@ public class TestHoodieWriteConfig {
   inputStream.close();
 }
 HoodieWriteConfig config = builder.build();
-assertEquals(config.getMaxCommitsToKeep(), 5);
-assertEquals(config.getMinCommitsToKeep(), 2);
+assertEquals(5, config.getMaxCommitsToKeep());
+assertEquals(2, config.getMinCommitsToKeep());
   }
 
   private ByteArrayOutputStream saveParamsIntoOutputStream(Map 
params) throws IOException {
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
index 3ad5a99..e946450 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
@@ -20,7 +20,7 @@ package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.List;
@@ -28,9 +28,9 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestBucketizedBloomCheckPartitioner {
 
@@ -45,12 +45,12 @@ public class TestBucketizedBloomCheckPartitioner {
 };
 BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4, 
fileToComparisons, 10);
 Map> assignments = p.getFileGroupToPartitions();
-assertEquals("f1 should have 4 buckets", 4, assignments.get("f1").size());
-assertEquals("f2 should have 4 buckets", 4, assignments.get("f2").size());
-assertEquals("f3 should have 2 buckets", 2, assignments.get("f3").size());
-assertArrayEquals("f1 spread across 3 partitions", new Integer[] {0, 0, 1, 
3}, assignments.get("f1").toArray());
-assertArrayEquals("f2 spread across 3 partitions", new Integer[] {1, 2, 2, 
0}, assignments.get("f2").toArray());
-assertArrayEquals("f3 spread across 2 partitions", new Integer[] {3, 1}, 
assignments.get("f3").toArray());
+assertEquals(4, assignments.get("f1").size(), "f1 should have 4 buckets");
+assertEquals(4, assignments.get("f2").size(), "f2 should have 4 buckets");
+assertEquals(2, assignments.get("f3").size(), "f3 should have 2 buckets");
+assertArrayEquals(new Integer[] {0, 0, 1, 3}, 
assignments.get("f1").toArray(), "f1 spread across 3 partitions");
+assertArrayEquals(new Integer[] {1, 2, 2, 0}, 
assignments.get("f2").toArray(), "f2 spread across 3 partitions");
+assertArrayEquals(new Integer[] {3, 1}, assignments.get("f3").toArray(), 
"f3 spread across 2 partitions");
   }
 
   @Test
@@ -78,7 +78,7 @@ public class TestBucketizedBloomCheckPartitioner {
   }
 };
 

[jira] [Updated] (HUDI-845) Allow parallel writing and move the pending rollback work into cleaner

2020-04-28 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-845:

Description: 
Things to think about 
 * Commit time has to be unique across writers 
 * Parallel writers can finish commits out of order i.e c2 commits before c1.
 * MOR log blocks fence uncommited data.. 
 * Cleaner should loudly complain if it cannot finish cleaning up partial 
writes.  

 

P.S: think about what is left for the general thing : log files may have 
different order, inserts may violate uniqueness constraint

  was:
Things to think about 
 * Commit time has to be unique across writers 
 * Parallel writers can finish commits out of order i.e c2 commits before c1.
 * MOR log blocks fence uncommited data.. 
 * Cleaner should loudly complain if it cannot finish cleaning up partial 
writes.  


> Allow parallel writing and move the pending rollback work into cleaner
> --
>
> Key: HUDI-845
> URL: https://issues.apache.org/jira/browse/HUDI-845
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Writer Core
>Reporter: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
>
> Things to think about 
>  * Commit time has to be unique across writers 
>  * Parallel writers can finish commits out of order i.e c2 commits before c1.
>  * MOR log blocks fence uncommited data.. 
>  * Cleaner should loudly complain if it cannot finish cleaning up partial 
> writes.  
>  
> P.S: think about what is left for the general thing : log files may have 
> different order, inserts may violate uniqueness constraint



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-845) Allow parallel writing and move the pending rollback work into cleaner

2020-04-28 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-845:

Description: 
Things to think about 
 * Commit time has to be unique across writers 
 * Parallel writers can finish commits out of order i.e c2 commits before c1.
 * MOR log blocks fence uncommited data.. 
 * Cleaner should loudly complain if it cannot finish cleaning up partial 
writes.  

  was:
Things to think about 
 * Commit time has to be unique across writers 
 * Parallel writers can finish commits out of order i.e c2 commits before c1.
 * MOR log blocks fence uncommited data.. 
 * Cleaner should loudly complain if it cannot finish cleaning up partial 
writes. 

 

 

 

 


> Allow parallel writing and move the pending rollback work into cleaner
> --
>
> Key: HUDI-845
> URL: https://issues.apache.org/jira/browse/HUDI-845
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Writer Core
>Reporter: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
>
> Things to think about 
>  * Commit time has to be unique across writers 
>  * Parallel writers can finish commits out of order i.e c2 commits before c1.
>  * MOR log blocks fence uncommited data.. 
>  * Cleaner should loudly complain if it cannot finish cleaning up partial 
> writes.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (HUDI-845) Allow parallel writing and move the pending rollback work into cleaner

2020-04-28 Thread Vinoth Chandar (Jira)
Vinoth Chandar created HUDI-845:
---

 Summary: Allow parallel writing and move the pending rollback work 
into cleaner
 Key: HUDI-845
 URL: https://issues.apache.org/jira/browse/HUDI-845
 Project: Apache Hudi (incubating)
  Issue Type: Improvement
  Components: Writer Core
Reporter: Vinoth Chandar
 Fix For: 0.6.0


Things to think about 
 * Commit time has to be unique across writers 
 * Parallel writers can finish commits out of order i.e c2 commits before c1.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: hudi-snapshot-deployment-0.5 #262

2020-04-28 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 2.36 KB...]
/home/jenkins/tools/maven/apache-maven-3.5.4/conf:
logging
settings.xml
toolchains.xml

/home/jenkins/tools/maven/apache-maven-3.5.4/conf/logging:
simplelogger.properties

/home/jenkins/tools/maven/apache-maven-3.5.4/lib:
aopalliance-1.0.jar
cdi-api-1.0.jar
cdi-api.license
commons-cli-1.4.jar
commons-cli.license
commons-io-2.5.jar
commons-io.license
commons-lang3-3.5.jar
commons-lang3.license
ext
guava-20.0.jar
guice-4.2.0-no_aop.jar
jansi-1.17.1.jar
jansi-native
javax.inject-1.jar
jcl-over-slf4j-1.7.25.jar
jcl-over-slf4j.license
jsr250-api-1.0.jar
jsr250-api.license
maven-artifact-3.5.4.jar
maven-artifact.license
maven-builder-support-3.5.4.jar
maven-builder-support.license
maven-compat-3.5.4.jar
maven-compat.license
maven-core-3.5.4.jar
maven-core.license
maven-embedder-3.5.4.jar
maven-embedder.license
maven-model-3.5.4.jar
maven-model-builder-3.5.4.jar
maven-model-builder.license
maven-model.license
maven-plugin-api-3.5.4.jar
maven-plugin-api.license
maven-repository-metadata-3.5.4.jar
maven-repository-metadata.license
maven-resolver-api-1.1.1.jar
maven-resolver-api.license
maven-resolver-connector-basic-1.1.1.jar
maven-resolver-connector-basic.license
maven-resolver-impl-1.1.1.jar
maven-resolver-impl.license
maven-resolver-provider-3.5.4.jar
maven-resolver-provider.license
maven-resolver-spi-1.1.1.jar
maven-resolver-spi.license
maven-resolver-transport-wagon-1.1.1.jar
maven-resolver-transport-wagon.license
maven-resolver-util-1.1.1.jar
maven-resolver-util.license
maven-settings-3.5.4.jar
maven-settings-builder-3.5.4.jar
maven-settings-builder.license
maven-settings.license
maven-shared-utils-3.2.1.jar
maven-shared-utils.license
maven-slf4j-provider-3.5.4.jar
maven-slf4j-provider.license
org.eclipse.sisu.inject-0.3.3.jar
org.eclipse.sisu.inject.license
org.eclipse.sisu.plexus-0.3.3.jar
org.eclipse.sisu.plexus.license
plexus-cipher-1.7.jar
plexus-cipher.license
plexus-component-annotations-1.7.1.jar
plexus-component-annotations.license
plexus-interpolation-1.24.jar
plexus-interpolation.license
plexus-sec-dispatcher-1.4.jar
plexus-sec-dispatcher.license
plexus-utils-3.1.0.jar
plexus-utils.license
slf4j-api-1.7.25.jar
slf4j-api.license
wagon-file-3.1.0.jar
wagon-file.license
wagon-http-3.1.0-shaded.jar
wagon-http.license
wagon-provider-api-3.1.0.jar
wagon-provider-api.license

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/ext:
README.txt

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native:
freebsd32
freebsd64
linux32
linux64
osx
README.txt
windows32
windows64

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/freebsd32:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/freebsd64:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/linux32:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/linux64:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/osx:
libjansi.jnilib

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/windows32:
jansi.dll

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/windows64:
jansi.dll
Finished /home/jenkins/tools/maven/apache-maven-3.5.4 Directory Listing :
Detected current version as: 
'HUDI_home=
0.6.0-SNAPSHOT'
[INFO] Scanning for projects...
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-spark_2.11:jar:0.6.0-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.hudi:hudi-spark_${scala.binary.version}:[unknown-version], 

 line 26, column 15
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-timeline-service:jar:0.6.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.(groupId:artifactId)' must be unique but found 
duplicate declaration of plugin org.jacoco:jacoco-maven-plugin @ 
org.apache.hudi:hudi-timeline-service:[unknown-version], 

 line 58, column 15
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-utilities_2.11:jar:0.6.0-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.hudi:hudi-utilities_${scala.binary.version}:[unknown-version], 

 line 26, column 15
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-spark-bundle_2.11:jar:0.6.0-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 

[GitHub] [incubator-hudi] nsivabalan commented on a change in pull request #1402: [HUDI-407] Adding Simple Index

2020-04-28 Thread GitBox


nsivabalan commented on a change in pull request #1402:
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r417046515



##
File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieGlobalSimpleIndex.java
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A global simple index which reads interested fields(record key and 
partition path) from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
+
+  public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+return tagLocationInternal(recordRDD, jsc, hoodieTable);
+  }
+
+  /**
+   * Tags records location for incoming records.
+   *
+   * @param recordRDD   {@link JavaRDD} of incoming records
+   * @param jsc instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} to use
+   * @return {@link JavaRDD} of records with record locations set
+   */
+  protected JavaRDD> 
tagLocationInternal(JavaRDD> recordRDD, JavaSparkContext jsc,
+ HoodieTable 
hoodieTable) {
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable,

Review comment:
   I have tried my best to simplify, can you take a look. But wanted to 
remind you that, we need to have recordKey as key to join and not HoodieKey 
since partition path may diff for existing keys compared to incoming records 
and hence. And so, from the return value of fetchRecordLocations, I had to do 
another mapToPair. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nsivabalan commented on pull request #1469: [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching

2020-04-28 Thread GitBox


nsivabalan commented on pull request #1469:
URL: https://github.com/apache/incubator-hudi/pull/1469#issuecomment-620971906


   > @vinothchandar I think the work can be finished this week, will ping you 
when finished : )
   > 
   > > the fetchRecordLocation() API and global indexing is not implemented..Do 
you plan to work on them as well?
   > 
   > work   status
   > `fetchRecordLocation() API `   working
   > unit tests working
   > global simple indexhttps://issues.apache.org/jira/browse/HUDI-787
   
   @lamber-ken : Just to confirm we are in same page. We wanted a Global index 
version for HoodieBloomIndexV2. The global simple index you quoted is the 
global version for SimpleIndex. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] hddong commented on a change in pull request #1567: [HUDI-840]Clean blank file created by HoodieLogFormatWriter

2020-04-28 Thread GitBox


hddong commented on a change in pull request #1567:
URL: https://github.com/apache/incubator-hudi/pull/1567#discussion_r417047280



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
##
@@ -210,6 +210,14 @@ public void close() throws IOException {
 flush();
 output.close();
 output = null;
+Path path = logFile.getPath();

Review comment:
   > @hddong : Any possible reasons why blank file is created in the first 
place ?
   
   Create blank file for `appendBlock` when new `HoodieLogFormatWriter`
   
https://github.com/apache/incubator-hudi/blob/f1592be629c3f9762f62d4e1dbf3be54f213d92d/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java#L105-L108
   But there is a special case,  when roll over is true(block size is past the 
threshold), we will close the old writer and create a new writer . And if we 
close a new writer created by `rolloverIfNeeded` , there will left a blank file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nsivabalan commented on a change in pull request #1402: [HUDI-407] Adding Simple Index

2020-04-28 Thread GitBox


nsivabalan commented on a change in pull request #1402:
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r417046732



##
File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieGlobalSimpleIndex.java
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A global simple index which reads interested fields(record key and 
partition path) from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
+
+  public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+return tagLocationInternal(recordRDD, jsc, hoodieTable);
+  }
+
+  /**
+   * Tags records location for incoming records.
+   *
+   * @param recordRDD   {@link JavaRDD} of incoming records
+   * @param jsc instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} to use
+   * @return {@link JavaRDD} of records with record locations set
+   */
+  protected JavaRDD> 
tagLocationInternal(JavaRDD> recordRDD, JavaSparkContext jsc,
+ HoodieTable 
hoodieTable) {
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable,

Review comment:
   Wrt fetchRecordLocations(), we have overloaded 
loadAllFilesForPartitions() in HoodieGlobalSimpleIndex. So, we are loading all 
partitions. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nsivabalan commented on a change in pull request #1402: [HUDI-407] Adding Simple Index

2020-04-28 Thread GitBox


nsivabalan commented on a change in pull request #1402:
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r417046515



##
File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieGlobalSimpleIndex.java
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A global simple index which reads interested fields(record key and 
partition path) from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
+
+  public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+return tagLocationInternal(recordRDD, jsc, hoodieTable);
+  }
+
+  /**
+   * Tags records location for incoming records.
+   *
+   * @param recordRDD   {@link JavaRDD} of incoming records
+   * @param jsc instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} to use
+   * @return {@link JavaRDD} of records with record locations set
+   */
+  protected JavaRDD> 
tagLocationInternal(JavaRDD> recordRDD, JavaSparkContext jsc,
+ HoodieTable 
hoodieTable) {
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable,

Review comment:
   I have tried to do a bit of simplifying. But wanted to remind you that, 
we need to have recordKey as key to join and not HoodieKey since partition path 
may diff for existing keys compared to incoming records and hence. And so, from 
the return value of fetchRecordLocations, I had to do another mapToPair. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (HUDI-812) Migrate hudi-common tests to JUnit 5

2020-04-28 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-812:

Status: In Progress  (was: Open)

> Migrate hudi-common tests to JUnit 5
> 
>
> Key: HUDI-812
> URL: https://issues.apache.org/jira/browse/HUDI-812
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-813) Migrate hudi-utilities tests to JUnit 5

2020-04-28 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-813:

Status: In Progress  (was: Open)

> Migrate hudi-utilities tests to JUnit 5
> ---
>
> Key: HUDI-813
> URL: https://issues.apache.org/jira/browse/HUDI-813
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] xushiyan commented on pull request #1570: [HUDI-814] Migrate hudi-client tests to JUnit 5

2020-04-28 Thread GitBox


xushiyan commented on pull request #1570:
URL: https://github.com/apache/incubator-hudi/pull/1570#issuecomment-620965031


   @yanghua This PR migrates all remaining test cases in hudi-client that are 
not subclasses of HoodieClientTestHarness. It is ready for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (HUDI-812) Migrate hudi-common tests to JUnit 5

2020-04-28 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-812:

Status: Open  (was: New)

> Migrate hudi-common tests to JUnit 5
> 
>
> Key: HUDI-812
> URL: https://issues.apache.org/jira/browse/HUDI-812
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-813) Migrate hudi-utilities tests to JUnit 5

2020-04-28 Thread Raymond Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-813:

Status: Open  (was: New)

> Migrate hudi-utilities tests to JUnit 5
> ---
>
> Key: HUDI-813
> URL: https://issues.apache.org/jira/browse/HUDI-813
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] xushiyan opened a new pull request #1570: [HUDI-814] Migrate hudi-client tests to JUnit 5

2020-04-28 Thread GitBox


xushiyan opened a new pull request #1570:
URL: https://github.com/apache/incubator-hudi/pull/1570


   Migrate the test cases in hudi-client to JUnit 5.
   
   Follows #1553 
   
   ### Migration status (after merging)
   
   | Package | JUnit 5 lib | API migration | Restructure packages |
   | --- | --- | --- | --- |
   | `hudi-cli` | ✅ | ✅ | - |
   | `hudi-client` | ✅ | ✅ | - |
   | `hudi-common` | ✅ |  |  |
   | `hudi-hadoop-mr` | ✅ | ✅ | - |
   | `hudi-hive-sync` | ✅ | ✅ | - |
   | `hudi-integ-test` | ✅ | ✅  | N.A. |
   | `hudi-spark` | ✅ | ✅ | - |
   | `hudi-timeline-service` | ✅ | ✅ | - |
   | `hudi-utilities` | ✅ |  | - |
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (HUDI-814) Migrate hudi-client tests to JUnit 5

2020-04-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-814:

Labels: pull-request-available  (was: )

> Migrate hudi-client tests to JUnit 5
> 
>
> Key: HUDI-814
> URL: https://issues.apache.org/jira/browse/HUDI-814
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (HUDI-558) Introduce ability to compress bloom filters while storing in parquet

2020-04-28 Thread liwei (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liwei reassigned HUDI-558:
--

Assignee: liwei

> Introduce ability to compress bloom filters while storing in parquet
> 
>
> Key: HUDI-558
> URL: https://issues.apache.org/jira/browse/HUDI-558
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Index, Performance
>Reporter: Balaji Varadarajan
>Assignee: liwei
>Priority: Blocker
>  Labels: help-wanted, pull-request-available
> Fix For: 0.6.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Based on performance study 
> [https://docs.google.com/spreadsheets/d/1KCmmdgaFTWBmpOk9trePdQ2m6wPVj2G328fTcRnQP1M/edit?usp=sharing]
>  we found that there is benefit in compressing bloom filters when storing in 
> parquet. As this is an experimental feature, we will need to disable this 
> feature by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync

2020-04-28 Thread GitBox


umehrot2 commented on a change in pull request #1559:
URL: https://github.com/apache/incubator-hudi/pull/1559#discussion_r417020450



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -178,6 +193,17 @@ public Schema convertParquetSchemaToAvro(MessageType 
parquetSchema) {
 return avroSchemaConverter.convert(parquetSchema);
   }
 
+  /**
+   * Convert a avro scheme to the parquet format.
+   *
+   * @param schema The avro schema to convert
+   * @return The converted parquet schema
+   */
+  public MessageType convertAvroSchemaToParquet(Schema schema) {

Review comment:
   There is nothing really that we can re-use from ParquetUtils for the 
purpose of this PR. The APIs in ParquetUtils class accept a file path from 
which to read. However, here it first needs to go through the active timeline 
and find out the file path and then read the schema. The reading from file 
functions of this class can internally re-use some of the APIs from 
ParquetUtils but I don't think we should touch it as part of this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync

2020-04-28 Thread GitBox


umehrot2 commented on a change in pull request #1559:
URL: https://github.com/apache/incubator-hudi/pull/1559#discussion_r417020636



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -145,23 +146,37 @@ public MessageType getDataSchema() throws Exception {
* @return Avro schema for this table
* @throws Exception
*/
-  public Schema getTableSchema() throws Exception {
-return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableSchemaInAvroFormat() throws Exception {
+Option schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata();
+return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() :
+   convertParquetSchemaToAvro(getDataSchema());
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableSchemaInParquetFormat() throws Exception {
+Option schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata();
+return schemaFromCommitMetadata.isPresent() ? 
convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
+   getDataSchema();
   }
 
   /**
* Gets the schema for a hoodie table in Avro format from the 
HoodieCommitMetadata of the last commit.
*
* @return Avro schema for this table
-   * @throws Exception
*/
-  public Schema getTableSchemaFromCommitMetadata() throws Exception {
+  private Option getTableSchemaFromCommitMetadata() {
 try {
   HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   byte[] data = 
timeline.getInstantDetails(timeline.lastInstant().get()).get();
   HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
   String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
-  return new Schema.Parser().parse(existingSchemaStr);
+  return StringUtils.isNullOrEmpty(existingSchemaStr) ? Option.empty() :

Review comment:
   Agreed. This would be a good addition and make it cleaner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (HUDI-827) Translation error

2020-04-28 Thread leesf (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf closed HUDI-827.
--

> Translation error
> -
>
> Key: HUDI-827
> URL: https://issues.apache.org/jira/browse/HUDI-827
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>  Components: docs-chinese
>Affects Versions: 0.5.2
>Reporter: Lisheng Wang
>Assignee: Lisheng Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>
> found translation error in 
> [https://hudi.apache.org/cn/docs/writing_data.html], 
> "如优化文件大小之类后", should be "如优化文件大小之后" 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync

2020-04-28 Thread GitBox


umehrot2 commented on a change in pull request #1559:
URL: https://github.com/apache/incubator-hudi/pull/1559#discussion_r417019322



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -145,23 +146,37 @@ public MessageType getDataSchema() throws Exception {
* @return Avro schema for this table
* @throws Exception
*/
-  public Schema getTableSchema() throws Exception {
-return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableSchemaInAvroFormat() throws Exception {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] umehrot2 commented on a change in pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync

2020-04-28 Thread GitBox


umehrot2 commented on a change in pull request #1559:
URL: https://github.com/apache/incubator-hudi/pull/1559#discussion_r417019457



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -145,23 +146,37 @@ public MessageType getDataSchema() throws Exception {
* @return Avro schema for this table
* @throws Exception
*/
-  public Schema getTableSchema() throws Exception {
-return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableSchemaInAvroFormat() throws Exception {
+Option schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata();
+return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() :
+   convertParquetSchemaToAvro(getDataSchema());
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableSchemaInParquetFormat() throws Exception {

Review comment:
   Added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (HUDI-842) Implementation plan for RFC 15 (File Listing and Query Planning Improvements))

2020-04-28 Thread Prashant Wason (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094944#comment-17094944
 ] 

Prashant Wason commented on HUDI-842:
-

[~vinoth]  This is the master ticket for the RFC 15.

> Implementation plan for RFC 15 (File Listing and Query Planning Improvements))
> --
>
> Key: HUDI-842
> URL: https://issues.apache.org/jira/browse/HUDI-842
> Project: Apache Hudi (incubating)
>  Issue Type: New Feature
>Reporter: Prashant Wason
>Assignee: Prashant Wason
>Priority: Major
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
> This is an umbrella task which tracks the implementation of [RFC 15 - File 
> Listing and Query Planning 
> Improvements|[https://cwiki.apache.org/confluence/display/HUDI/RFC+-+15%3A+HUDI+File+Listing+and+Query+Planning+Improvements]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] lamber-ken edited a comment on issue #1563: [SUPPORT] When I package according to the package command in GitHub, I always report an error, such as

2020-04-28 Thread GitBox


lamber-ken edited a comment on issue #1563:
URL: https://github.com/apache/incubator-hudi/issues/1563#issuecomment-620894265


   hello, what's your maven version? @GSHF 
   
   I tested mvn from `3.3.9` to `3.5.3`, all worked fine.
   
   > in idea of Windows version.
   
   Unix system is recommend.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken commented on issue #1563: [SUPPORT] When I package according to the package command in GitHub, I always report an error, such as

2020-04-28 Thread GitBox


lamber-ken commented on issue #1563:
URL: https://github.com/apache/incubator-hudi/issues/1563#issuecomment-620894265


   hello, what's your maven version? @GSHF 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken commented on issue #1552: Time taken for upserting hudi table is increasing with increase in number of partitions

2020-04-28 Thread GitBox


lamber-ken commented on issue #1552:
URL: https://github.com/apache/incubator-hudi/issues/1552#issuecomment-620893610


    BUG status
   
   | bug | status | way |
   | :-| : | :: |
   | upsert long time first time | fixed | upgrate version (0.5.0 to master) |
   | FileStatusExt: Metadata Entry doesn't exist | fixed | using a whole new 
base path |
   |java.io.FileNotFoundException: No such file or directory | fiexed | but 
don't know the root cause |
   | 17 minutes lag between HoodieActiveTimeline and CleanActionExecutor | 
fighting | xxx |



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken commented on pull request #1469: [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching

2020-04-28 Thread GitBox


lamber-ken commented on pull request #1469:
URL: https://github.com/apache/incubator-hudi/pull/1469#issuecomment-620880356


   @vinothchandar I think the work can be finished this week. : )
   
   > the fetchRecordLocation() API and global indexing is not implemented..Do 
you plan to work on them as well?
   
   |  work   |  status  |
   |    |   |
   | `fetchRecordLocation() API `  | working |
   | unit tests | working |
   | global simple index | https://issues.apache.org/jira/browse/HUDI-787 |
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken edited a comment on pull request #1469: [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching

2020-04-28 Thread GitBox


lamber-ken edited a comment on pull request #1469:
URL: https://github.com/apache/incubator-hudi/pull/1469#issuecomment-620880356


   @vinothchandar I think the work can be finished this week, will ping you 
when finished : )
   
   > the fetchRecordLocation() API and global indexing is not implemented..Do 
you plan to work on them as well?
   
   |  work   |  status  |
   |    |   |
   | `fetchRecordLocation() API `  | working |
   | unit tests | working |
   | global simple index | https://issues.apache.org/jira/browse/HUDI-787 |
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken edited a comment on issue #1564: update hudi meta in hive with no partition

2020-04-28 Thread GitBox


lamber-ken edited a comment on issue #1564:
URL: https://github.com/apache/incubator-hudi/issues/1564#issuecomment-620876765


   > @lamber-ken : Will take care of this ticket.
   
    no problem



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken commented on issue #1564: update hudi meta in hive with no partition

2020-04-28 Thread GitBox


lamber-ken commented on issue #1564:
URL: https://github.com/apache/incubator-hudi/issues/1564#issuecomment-620876765


   > @lamber-ken : Will take care of this ticket.
   
    



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #1516: [HUDI-784] Adressing issue with log reader on GCS

2020-04-28 Thread GitBox


bvaradar commented on a change in pull request #1516:
URL: https://github.com/apache/incubator-hudi/pull/1516#discussion_r416935621



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##
@@ -79,6 +79,11 @@
   this.inputStream = fsDataInputStream;
 }
 
+// Defensive measure to make sure nothing advanced the stream.
+if (fsDataInputStream.getPos() != 0) {

Review comment:
   @afilipchik : Can we revert this change then and try as it is likely the 
issue with static vs non-static ? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #1567: [HUDI-840]Clean blank file created by HoodieLogFormatWriter

2020-04-28 Thread GitBox


bvaradar commented on a change in pull request #1567:
URL: https://github.com/apache/incubator-hudi/pull/1567#discussion_r416923914



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
##
@@ -210,6 +210,14 @@ public void close() throws IOException {
 flush();
 output.close();
 output = null;
+Path path = logFile.getPath();

Review comment:
   @hddong : Any possible reasons why blank file is created in the first 
place ? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on pull request #1524: [HUDI-801] Adding a way to post process schema after it is fetched

2020-04-28 Thread GitBox


bvaradar commented on pull request #1524:
URL: https://github.com/apache/incubator-hudi/pull/1524#issuecomment-620846353


   @afilipchik : Doesn't look like the rebase worked fine as I see other folk's 
commits in the PR. Can you rebase again ? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #1532: [HUDI-794]: implemented optional use of --config-folder option in HoodieDeltaStreamer

2020-04-28 Thread GitBox


bvaradar commented on a change in pull request #1532:
URL: https://github.com/apache/incubator-hudi/pull/1532#discussion_r416909810



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/DeltaStreamerUtility.java
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class DeltaStreamerUtility {
+
+  public static String getDefaultConfigFilePath(String configFolder, String 
database, String currentTable) {
+return configFolder + Constants.FILEDELIMITER + database + 
Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX;
+  }
+
+  public static String getTableWithDatabase(TableExecutionContext context) {
+return context.getDatabase() + Constants.DELIMITER + 
context.getTableName();
+  }
+
+  public static void checkIfPropsFileAndConfigFolderExist(String 
commonPropsFile, String configFolder, FileSystem fs) throws IOException {
+if (!fs.exists(new Path(commonPropsFile))) {
+  throw new IllegalArgumentException("Please provide valid common config 
file path!");
+}
+
+if (!fs.exists(new Path(configFolder))) {
+  fs.mkdirs(new Path(configFolder));
+}
+  }
+
+  public static void checkIfTableConfigFileExists(String configFolder, 
FileSystem fs, String configFilePath) throws IOException {
+if (!fs.exists(new Path(configFilePath)) || !fs.isFile(new 
Path(configFilePath))) {
+  throw new IllegalArgumentException("Please provide valid table config 
file path!");
+}
+
+Path path = new Path(configFilePath);
+Path filePathInConfigFolder = new Path(configFolder, path.getName());
+if (!fs.exists(filePathInConfigFolder)) {
+  FileUtil.copy(fs, path, fs, filePathInConfigFolder, false, fs.getConf());
+}
+  }
+
+  public static TypedProperties 
getTablePropertiesFromConfigFolder(HoodieDeltaStreamer.Config cfg, FileSystem 
fs) throws IOException {

Review comment:
   @pratyakshsharma : I am rethinking about this feature again. Looking at 
the changes and added config options, I feel that we are overcomplicating in 
trying to reuse a config structure which for MultiDeltaStreamer  to fit to 
DeltaStreamer. If all we want to use HoodieDeltaStreamer but have config 
structure specific to HoodieMultiDeltaStreamer, can we just instantiate 
HoodieMultiDeltaStreamer with one source ?  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync

2020-04-28 Thread GitBox


bvaradar commented on a change in pull request #1559:
URL: https://github.com/apache/incubator-hudi/pull/1559#discussion_r416891281



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -145,23 +146,37 @@ public MessageType getDataSchema() throws Exception {
* @return Avro schema for this table
* @throws Exception
*/
-  public Schema getTableSchema() throws Exception {
-return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableSchemaInAvroFormat() throws Exception {
+Option schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata();
+return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() :
+   convertParquetSchemaToAvro(getDataSchema());
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableSchemaInParquetFormat() throws Exception {
+Option schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata();
+return schemaFromCommitMetadata.isPresent() ? 
convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
+   getDataSchema();
   }
 
   /**
* Gets the schema for a hoodie table in Avro format from the 
HoodieCommitMetadata of the last commit.
*
* @return Avro schema for this table
-   * @throws Exception
*/
-  public Schema getTableSchemaFromCommitMetadata() throws Exception {
+  private Option getTableSchemaFromCommitMetadata() {
 try {
   HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   byte[] data = 
timeline.getInstantDetails(timeline.lastInstant().get()).get();
   HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
   String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
-  return new Schema.Parser().parse(existingSchemaStr);
+  return StringUtils.isNullOrEmpty(existingSchemaStr) ? Option.empty() :

Review comment:
   On a related note :  As we are start to rely on avro schema to be 
present in commit-metadata, we should store avro-schema as first-level entity 
in commit metadata instead of storing it in extra-metadata map and handle 
upgrade-downgrade (Added https://jira.apache.org/jira/browse/HUDI-844) 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -145,23 +146,37 @@ public MessageType getDataSchema() throws Exception {
* @return Avro schema for this table
* @throws Exception
*/
-  public Schema getTableSchema() throws Exception {
-return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableSchemaInAvroFormat() throws Exception {
+Option schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata();
+return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() :
+   convertParquetSchemaToAvro(getDataSchema());
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableSchemaInParquetFormat() throws Exception {

Review comment:
   You can introduce a getTableAvroSchemaFromDataFile to return in avro 
format. 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -178,6 +193,17 @@ public Schema convertParquetSchemaToAvro(MessageType 
parquetSchema) {
 return avroSchemaConverter.convert(parquetSchema);
   }
 
+  /**
+   * Convert a avro scheme to the parquet format.
+   *
+   * @param schema The avro schema to convert
+   * @return The converted parquet schema
+   */
+  public MessageType convertAvroSchemaToParquet(Schema schema) {

Review comment:
   Please check ParquetUtils class for similar APIs





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (HUDI-844) Store Avro schema string as first-level entity in commit metadata

2020-04-28 Thread Balaji Varadarajan (Jira)
Balaji Varadarajan created HUDI-844:
---

 Summary: Store Avro schema string as first-level entity in commit 
metadata
 Key: HUDI-844
 URL: https://issues.apache.org/jira/browse/HUDI-844
 Project: Apache Hudi (incubating)
  Issue Type: Improvement
  Components: Common Core
Reporter: Balaji Varadarajan
 Fix For: 0.6.0


Currently, we store avro schema string in commit metadata inside a map 
structure - extraMetadata. We are building logic where we expect this avro 
schema to be present in metadata. It would be cleaner if we store avro schema 
in the same way we store write-stats in commit metadata. 

We need to use MigrationHandler framework to handle upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] nandini57 commented on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


nandini57 commented on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620824966


   Yes ,so far requirement is to keep all record changes.In future ,may need to 
upsert as well.Thanks guys for the help!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on a change in pull request #1559: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync

2020-04-28 Thread GitBox


bvaradar commented on a change in pull request #1559:
URL: https://github.com/apache/incubator-hudi/pull/1559#discussion_r416879633



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##
@@ -145,23 +146,37 @@ public MessageType getDataSchema() throws Exception {
* @return Avro schema for this table
* @throws Exception
*/
-  public Schema getTableSchema() throws Exception {
-return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableSchemaInAvroFormat() throws Exception {

Review comment:
   Rename to just getTableAvroSchema ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on issue #1564: update hudi meta in hive with no partition

2020-04-28 Thread GitBox


bvaradar commented on issue #1564:
URL: https://github.com/apache/incubator-hudi/issues/1564#issuecomment-620814862


   @lamber-ken : Will take care of this ticket.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on issue #1564: update hudi meta in hive with no partition

2020-04-28 Thread GitBox


bvaradar commented on issue #1564:
URL: https://github.com/apache/incubator-hudi/issues/1564#issuecomment-620808367


   @zhangxia1030  : IIUC, you are seeing issues when hive-syncing non-partition 
table. Please look at this issue 
https://github.com/apache/incubator-hudi/issues/1568#issuecomment-620292854 for 
problem and possible solution. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


bvaradar commented on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620806109


   @nandini57, You can prefix with a timestamp like 
"" to get ordering benefits. 
   From your description, it looks like you essentially want the table to be a 
log of all record changes and you are simply inserting new records and no 
updates are possible. Right ?  In this case,  you can simply use 
bulk-insert/insert APIs which would avoid record-key index lookups in the first 
place. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] bvaradar commented on issue #1531: run example

2020-04-28 Thread GitBox


bvaradar commented on issue #1531:
URL: https://github.com/apache/incubator-hudi/issues/1531#issuecomment-620798845


   @c-f-cooper : You can use any of 0.5.1/0.5.2 or master to see the fix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nandini57 commented on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


nandini57 commented on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620786548


   Great,thanks Vinoth.Is Murmurhash of my businesskeys a good choice then?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] vinothchandar commented on issue #1552: Time taken for upserting hudi table is increasing with increase in number of partitions

2020-04-28 Thread GitBox


vinothchandar commented on issue #1552:
URL: https://github.com/apache/incubator-hudi/issues/1552#issuecomment-620781382


   @harshi2506 On master, there is no `CopyOnWriteLazyInsertIterable` 
   
https://github.com/apache/incubator-hudi/tree/master/hudi-client/src/main/java/org/apache/hudi/execution
 . Wondering if you are behind on master. 
   
   In any case, are you running with consistency check enabled? That error 
seems to indicate that close() cannot find the file it wrote. 
   
   https://hudi.apache.org/docs/configurations.html#withConsistencyCheckEnabled 
   This has been done many times.. So must be some issue..
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] vinothchandar commented on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


vinothchandar commented on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620776648


   @nandini57 It just has to be ordered, increasing/decreasing does not 
matter.. can be non-contiguous.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] vinothchandar commented on issue #1531: run example

2020-04-28 Thread GitBox


vinothchandar commented on issue #1531:
URL: https://github.com/apache/incubator-hudi/issues/1531#issuecomment-620775850


   could you try master or 0.5.2? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] vinothchandar commented on issue #1228: No FileSystem for scheme: abfss

2020-04-28 Thread GitBox


vinothchandar commented on issue #1228:
URL: https://github.com/apache/incubator-hudi/issues/1228#issuecomment-620775308


   Closing in favor of JIRA 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nandini57 edited a comment on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


nandini57 edited a comment on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620749380


   Thanks Balaji. Yesterday , i did change the parameter to retain 40 commits 
and changed the _hoodie_record_key to include my business batch id column along 
with one of the other columns. Instead of OverwriteRecordPayload ,using a 
custom payload which will just add the records in each commit instead of 
removing from disk.The business batch id increments with every ingestion and i 
can audit based on commit time to have a view of data at a particular point in 
past.
   spark.sql("select * from hoodie_ro where cast(_hoodie_commit_time as long) 
<=" + Long.valueOf(commitTime)).show();
   
   Is it a good idea to conceive _hoodie_record_key as 123_1,123_4 .. or it has 
to be monotonically increasing to help indexing?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nandini57 edited a comment on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


nandini57 edited a comment on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620749380


   Thanks Balaji. Yesterday , i did change the parameter to retain 40 commits 
and changed the _hoodie_record_key to include my business batch id column along 
with one of the other columns. Instead of EmptyRecordPayload ,using a custom 
payload which will just add the records in each commit instead of removing from 
disk.The business batch id increments with every ingestion and i can audit 
based on commit time to have a view of data at a particular point in past.
   spark.sql("select * from hoodie_ro where cast(_hoodie_commit_time as long) 
<=" + Long.valueOf(commitTime)).show();
   
   Is it a good idea to conceive _hoodie_record_key as 123_1,123_4 .. or it has 
to be monotonically increasing to help indexing?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nandini57 edited a comment on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


nandini57 edited a comment on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620749380


   Thanks Balaji. Yesterday , i did change the parameter to retain 40 commits 
and changed the _hoodie_record_key to include my business batch id column along 
with one of the other columns. Instead of EmptyRecordPayload ,using a custom 
payload which will just add the records in each commit instead of removing from 
disk.The business batch id increments with every ingestion and i can audit 
based on commit time to go back in time.
   spark.sql("select * from hoodie_ro where cast(_hoodie_commit_time as long) 
<=" + Long.valueOf(commitTime)).show();
   
   Is it a good idea to conceive _hoodie_record_key as 123_1,123_4 .. or it has 
to be monotonically increasing to help indexing?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] nandini57 commented on issue #1569: [SUPPORT] Audit Feature In A PartitionPath

2020-04-28 Thread GitBox


nandini57 commented on issue #1569:
URL: https://github.com/apache/incubator-hudi/issues/1569#issuecomment-620749380


   Thanks Balaji. Yesterday , i did change the parameter to retain 40 commits 
and changed the record key to include my business batch id column along with 
one of the other columns.The business batch id increments with every ingestion 
and i can audit based on batch id or commit time 
   spark.sql("select * from hoodie_ro where cast(_hoodie_commit_time as long) 
<=" + Long.valueOf(commitTime)).show();
   
   Is it a good idea to conceive record keys as 123_1,123_4 .. or it should be 
monotonically increasing to help indexing?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (HUDI-843) Support different time units in TimestampBasedKeyGenerator

2020-04-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-843:

Labels: pull-request-available  (was: )

> Support different time units in TimestampBasedKeyGenerator
> --
>
> Key: HUDI-843
> URL: https://issues.apache.org/jira/browse/HUDI-843
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>Reporter: Alexander Filipchik
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>
> TimestampBasedKeyGenerator only supports UNIX_TIMESTAMP and EPOCHMILLISECONDS 
> as a scalar source for partitions. 
>  
> we should introduce more flexible configuration to allow users to specify 
> other time units (days, nanoseconds, ...)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] afilipchik commented on pull request #1541: [HUDI-843] Add ability to specify time unit for TimestampBasedKeyGenerator

2020-04-28 Thread GitBox


afilipchik commented on pull request #1541:
URL: https://github.com/apache/incubator-hudi/pull/1541#issuecomment-620719213


   addressed comments



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1151: [WIP][HUDI-476] Add hudi-examples module

2020-04-28 Thread GitBox


vinothchandar commented on a change in pull request #1151:
URL: https://github.com/apache/incubator-hudi/pull/1151#discussion_r416731242



##
File path: hudi-examples/src/main/scripts/delta-streamer-cluster
##
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+JAR_FILE="$(echo 
packaging/hudi-utilities-bundle/target/hudi-utilities-bundle*-SNAPSHOT.jar | tr 
' ' ',')"
+EXAMPLES_JARS="$(echo hudi-examples/target/hudi-examples-*-SNAPSHOT.jar)"
+
+exec "${SPARK_HOME}"/bin/spark-submit \
+--master yarn \

Review comment:
   should master/deploy-mode be configurable via env var? yarn/cluster 
being the default 

##
File path: hudi-examples/pom.xml
##
@@ -0,0 +1,198 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+  
+hudi
+org.apache.hudi
+0.6.0-SNAPSHOT
+  
+  4.0.0
+
+  hudi-examples
+  jar
+
+  
+${project.parent.basedir}
+  
+
+  
+
+  
+src/main/resources
+  
+
+
+
+  
+org.apache.maven.plugins
+maven-dependency-plugin
+
+  
+copy-dependencies
+prepare-package
+
+  copy-dependencies
+
+
+  ${project.build.directory}/lib
+  true
+  true
+  true
+
+  
+
+  
+  
+net.alchim31.maven
+scala-maven-plugin
+
+  
+scala-compile-first
+process-resources
+
+  add-source
+  compile
+
+  
+
+  
+  
+org.apache.maven.plugins
+maven-compiler-plugin
+
+  
+compile
+
+  compile
+
+  
+
+  
+  
+org.apache.maven.plugins
+maven-jar-plugin
+
+  
+
+  test-jar
+
+test-compile
+  
+
+
+  false
+
+  
+  
+org.apache.rat
+apache-rat-plugin
+  
+
+  
+
+  
+
+
+  org.scala-lang
+  scala-library
+  ${scala.version}
+
+
+
+  org.apache.hudi
+  hudi-common
+  ${project.version}
+
+
+
+  org.apache.hudi
+  hudi-cli
+  ${project.version}
+
+
+
+  org.apache.hudi
+  hudi-client
+  ${project.version}
+
+
+
+  org.apache.hudi
+  hudi-utilities_${scala.binary.version}
+  ${project.version}
+
+
+
+  org.apache.hudi
+  hudi-spark_${scala.binary.version}
+  ${project.version}
+
+
+
+  org.apache.hudi
+  hudi-hadoop-mr
+  ${project.version}

Review comment:
   these versions here may not be necessary? and can be just inherited from 
parent ? 

##
File path: 
hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.common;
+
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import 

[GitHub] [incubator-hudi] yanghua commented on pull request #1100: [HUDI-289] Implement a test suite to support long running test for Hudi writing and querying end-end

2020-04-28 Thread GitBox


yanghua commented on pull request #1100:
URL: https://github.com/apache/incubator-hudi/pull/1100#issuecomment-620691500


   > @yanghua can you go through this PR and approve it ?
   
   Thanks for the great work. Will review it tomorrow.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1522: [HUDI-702]Add test for HoodieLogFileCommand

2020-04-28 Thread GitBox


yanghua commented on a change in pull request #1522:
URL: https://github.com/apache/incubator-hudi/pull/1522#discussion_r416723032



##
File path: 
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMemoryConfig;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.SchemaTestUtil.getSimpleSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Cases for {@link HoodieLogFileCommand}.
+ */
+public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
+
+  private String partitionPath;
+  private HoodieAvroDataBlock dataBlock;
+  private String tablePath;
+
+  private static final String INSTANT_TIME = "100";
+
+  @Before
+  public void init() throws IOException, InterruptedException, 
URISyntaxException {
+HoodieCLI.conf = jsc.hadoopConfiguration();
+
+// Create table and connect
+String tableName = "test_table";
+tablePath = basePath + File.separator + tableName;
+partitionPath = tablePath + File.separator + 
HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+new TableCommand().createTable(
+tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+"", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+new File(partitionPath).mkdirs();

Review comment:
   Replace it with `Files.createDirectories` is a better choice?

##
File path: 
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language 

[GitHub] [incubator-hudi] vinothchandar commented on pull request #1469: [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching

2020-04-28 Thread GitBox


vinothchandar commented on pull request #1469:
URL: https://github.com/apache/incubator-hudi/pull/1469#issuecomment-620688721


   @lamber-ken the fetchRecordLocation() API and global indexing is not 
implemented..Do you plan to work on them as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [HUDI-407] Adding Simple Index

2020-04-28 Thread GitBox


vinothchandar commented on a change in pull request #1402:
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r416354734



##
File path: hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
##
@@ -128,9 +133,26 @@ protected HoodieIndex(HoodieWriteConfig config) {
   /**
* Each index type should implement it's own logic to release any resources 
acquired during the process.
*/
-  public void close() {}
+  public void close() {
+  }
+
+  protected HoodieRecord getTaggedRecord(HoodieRecord inputRecord, 
Option location) {

Review comment:
   this can very well go into `HoodieIndexUtils` ?  There is no instance 
stats accessed in this method IIUC

##
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
##
@@ -92,6 +100,9 @@
   public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = 
"hoodie.bloom.index.update.partition.path";
   public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = 
"false";
 
+  public static final String GLOBAL_SIMPLE_INDEX_UPDATE_PARTITION_PATH = 
"hoodie.global.simple.index.update.partition.path";

Review comment:
   just `hoodie.simple.index.update.partition.path` to be consistent with 
the other config?

##
File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieGlobalSimpleIndex.java
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A global simple index which reads interested fields(record key and 
partition path) from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
+
+  public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+return tagLocationInternal(recordRDD, jsc, hoodieTable);
+  }
+
+  /**
+   * Tags records location for incoming records.
+   *
+   * @param recordRDD   {@link JavaRDD} of incoming records
+   * @param jsc instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} to use
+   * @return {@link JavaRDD} of records with record locations set
+   */
+  protected JavaRDD> 
tagLocationInternal(JavaRDD> recordRDD, JavaSparkContext jsc,
+ HoodieTable 
hoodieTable) {
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable,

Review comment:
   @nsivabalan This is problematic.. since we don't cache (rightfully so) 
the input here for Global index, 
   
   ```
   JavaPairRDD partitionRecordKeyPairRDD = 
hoodieKeys.mapToPair(entry -> new Tuple2(entry.getPartitionPath(), 
entry.getRecordKey()));
   List affectedPartitionPathList = 
partitionRecordKeyPairRDD.map(tuple -> tuple._1).distinct().collect();
   

[jira] [Closed] (HUDI-810) Migrate HoodieClientTestHarness to JUnit 5

2020-04-28 Thread vinoyang (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed HUDI-810.
-
Resolution: Done

Done via master branch: 06dae30297ea02ab122c9029a54f7927e8212039

> Migrate HoodieClientTestHarness to JUnit 5
> --
>
> Key: HUDI-810
> URL: https://issues.apache.org/jira/browse/HUDI-810
> Project: Apache Hudi (incubating)
>  Issue Type: Test
>  Components: Testing
>Reporter: Raymond Xu
>Assignee: Raymond Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>
> Once HoodieClientTestHarness migrated, the whole hudi-cli package can be 
> migrated all together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1566: [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode

2020-04-28 Thread GitBox


pratyakshsharma commented on pull request #1566:
URL: https://github.com/apache/incubator-hudi/pull/1566#issuecomment-620679844


   > if we redid the schema provider implementations such that the schema is 
read each time from SR (schema registry)
   
   Do you have some plan around how to achieve this? @vinothchandar 
   
   I was thinking of maintaining the subject and version information from the 
very beginning with us (which should be easy to get from the schema-registry 
url provided by the user), and then compare the same by fetching the serialized 
schema id from incoming messages. Whenever we experience there is some version 
update, we can refresh the schemas at that point and continue consuming the 
incoming messages. This would need writing our own custom AvroDeserializer. 
   
   Also the above plan only works for the combination of AvroKafkaSource and 
SchemaRegistryProvider. Thoughts? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1566: [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1566:
URL: https://github.com/apache/incubator-hudi/pull/1566#discussion_r416694602



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -162,18 +162,23 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, 
SparkSession sparkSession, Sche
 this.fs = fs;
 this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
 this.props = props;
-this.schemaProvider = schemaProvider;
 
 refreshTimeline();
-
 this.transformer = 
UtilHelpers.createTransformer(cfg.transformerClassNames);
 this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
-
-this.formatAdapter = new SourceFormatAdapter(
-UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider));
-
 this.conf = conf;
+refreshSchemaProvider(schemaProvider);
+  }
 
+  /**
+   * Very useful when DeltaStreamer is running in continuous mode.
+   * @param schemaProvider
+   * @throws IOException
+   */
+  public void refreshSchemaProvider(SchemaProvider schemaProvider) throws 
IOException {

Review comment:
   > It looks like refreshSchemaProvider not only refreshes schema-provider 
but also recreates Source and setup WriteClient
   
   Do you see any side effects of doing this? @bvaradar 
   
   > have delta-streamer call this ? 
   
   This call will happen exactly at the same point where I am calling 
refreshSchemaProvider in delta-streamer, right? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma edited a comment on pull request #1566: [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode

2020-04-28 Thread GitBox


pratyakshsharma edited a comment on pull request #1566:
URL: https://github.com/apache/incubator-hudi/pull/1566#issuecomment-620661941


   > I think it is not enough. Example: when consuming from kafka, schema might 
change midway. Example: we are reading 1 messages, schema will be fetched 
on start. Starting from 9000 message new column is added, but we will not see 
it. As a result we just lost data.
   
   Yeah true but that was not the purpose of this PR :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1566: [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode

2020-04-28 Thread GitBox


pratyakshsharma commented on pull request #1566:
URL: https://github.com/apache/incubator-hudi/pull/1566#issuecomment-620661941


   > I think it is not enough. Example: when consuming from kafka, schema might 
change midway. Example: we are reading 1 messages, schema will be fetched 
on start. Starting from 9000 message new column is added, but we will not see 
it. As a result we just lost data.
   
   Yeah true. that was not the purpose of this PR but I guess I should include 
it as well :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416667228



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, 
Schema readerSchema) {
+try {
+  ByteBuffer buffer = this.getByteBuffer(payload);
+  int id = buffer.getInt();

Review comment:
   @vinothchandar @afilipchik I explored a bit around this. Deserialization 
process has to happen exactly opposite to how the data was serialized. Also it 
is highly discouraged to not use schema-registry when working with avro and 
kafka [1][2][3]
   That said, I guess if some one wants to use vanilla AvroKafkaSource, he will 
have to serialize the data without actual schema-registry setup. Hence as a 
matter of practice and a good alternative can be mandating use of 
MockSchemaRegistryClient.java class for the same and hence the code written 
above specific to schema-registry should be fine. 
   
   I am suggesting use of MockSchemaRegistryClient.java class since that has 
been used by Confluent for writing their test cases and this will bring a 
standard practice to follow for using HoodieKafkaAvroDecoder.java class 
introduced with this PR. 
   
   If we agree to this, I will mention the same as a comment in the code above. 
   
   [1] https://github.com/confluentinc/confluent-kafka-dotnet/issues/530
   [2] 
https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1
   [3] 
https://stackoverflow.com/questions/45635726/kafkaavroserializer-for-serializing-avro-without-schema-registry-url





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r416563158



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder which aims 
at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer 
implements Decoder {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = 
"hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+TypedProperties typedProperties = new TypedProperties();
+copyProperties(typedProperties, properties.props());
+try {
+  SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
   > also, in general, it is safer to deserialize avro using both writer's 
and reader's schema
   
   Yes the flow here uses both of them for deserializing. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416505404



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);

Review comment:
   So basically what I want to understand here is - what specific 
advantages you get by having separate writer and reader schemas apart from 
being able to handle field renames using aliases? I tried to go through avro 
library, but still am not convinced about this. If you could point me to some 
useful documentation or blog regarding this, that would be great. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r416527261



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder which aims 
at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer 
implements Decoder {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = 
"hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+TypedProperties typedProperties = new TypedProperties();
+copyProperties(typedProperties, properties.props());
+try {
+  SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
   As per my understanding, Deserializer comes into picture after offsets 
have been determined since it will be called internally by kafka. So whatever 
sourceSchema we get at this point of time, that is going to be the latest for 
the incoming batch. Please correct me if I am missing something here. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416503101



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, 
Schema readerSchema) {
+try {
+  ByteBuffer buffer = this.getByteBuffer(payload);
+  int id = buffer.getInt();

Review comment:
   > this looks specific to confluent schema registry. Is it a good idea to 
assume all the messages will start with magic int?
   
   Ok, the actual purpose of this PR was to support kafka installations without 
schema-registry. I messed it up at this point. Thank you for pointing this out. 
Will handle this accordingly. 
   
   > any reason it it not inspired by the latest version?
   
   No specific reason. The implementation is inspired from the 
kafka-avro-serializer version we are using currently. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416505404



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);

Review comment:
   So basically what I want to understand here is - what specific 
advantages you get by having separate writer and reader schemas? I tried to go 
through avro library, but still am not convinced about this. If you could point 
me to some useful documentation or blog regarding this, that would be great. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416503101



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, 
Schema readerSchema) {
+try {
+  ByteBuffer buffer = this.getByteBuffer(payload);
+  int id = buffer.getInt();

Review comment:
   > this looks specific to confluent schema registry. Is it a good idea to 
assume all the messages will start with magic int?
   Ok, the actual purpose of this PR was to support kafka installations without 
schema-registry. I messed it up at this point. Thank you for pointing this out. 
Will handle this accordingly. 
   
   > any reason it it not inspired by the latest version?
   
   No specific reason. The implementation is inspired from the 
kafka-avro-serializer version we are using currently. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] harshi2506 commented on issue #1552: Time taken for upserting hudi table is increasing with increase in number of partitions

2020-04-28 Thread GitBox


harshi2506 commented on issue #1552:
URL: https://github.com/apache/incubator-hudi/issues/1552#issuecomment-620512620


   @vinothchandar, I tried building jar from mater branch and loaded a 
snapshot, it is failing every time saying 
   
   ```
   20/04/28 09:40:14 WARN TaskSetManager: Lost task 178.2 in stage 4.0 (TID 
37246, ip-10-0-61-179.ec2.internal, executor 17): java.lang.RuntimeException: 
org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert 
Handle for path 
/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert 
Handle for path 
/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
at 
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:110)
at 
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:46)
at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 23 more
   Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert 
Handle for path 
/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
at 
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:106)
... 25 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert 
Handle for path 
/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
... 26 more
   Caused by: org.apache.hudi.exception.HoodieInsertException: Failed to close 
the Insert Handle for path 
/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
at 
org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:183)
at 
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteLazyInsertIterable.java:152)
at 
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteLazyInsertIterable.java:132)
at 

[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416489455



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, 
Schema readerSchema) {
+try {
+  ByteBuffer buffer = this.getByteBuffer(payload);
+  int id = buffer.getInt();
+  int length = buffer.limit() - 1 - 4;
+  Object result;
+  if (sourceSchema.getType().equals(Schema.Type.BYTES)) {
+byte[] bytes = new byte[length];
+buffer.get(bytes, 0, length);
+result = bytes;
+  } else {
+int start = buffer.position() + buffer.arrayOffset();
+DatumReader reader = this.getDatumReader(sourceSchema, readerSchema);

Review comment:
   They are both same. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416489207



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);

Review comment:
   Did not get this point. What do you mean by writer schema getting lost? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] hddong commented on a change in pull request #1522: [HUDI-702]Add test for HoodieLogFileCommand

2020-04-28 Thread GitBox


hddong commented on a change in pull request #1522:
URL: https://github.com/apache/incubator-hudi/pull/1522#discussion_r416482802



##
File path: 
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
##
@@ -173,7 +176,11 @@ public String showLogFileRecords(
 HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
 FileSystem fs = client.getFs();
 List logFilePaths = Arrays.stream(fs.globStatus(new 
Path(logFilePathPattern)))
-.map(status -> 
status.getPath().toString()).collect(Collectors.toList());
+.map(status -> 
status.getPath().toString()).sorted(Comparator.reverseOrder())

Review comment:
   > Do we need to sort this data set?
   
   IMO, it should be. Default return value is unordered, we may touch the blank 
log file when `readSchemaFromLogFile`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1558: [HUDI-796]: added deduping logic for upserts case

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1558:
URL: https://github.com/apache/incubator-hudi/pull/1558#discussion_r416469107



##
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##
@@ -103,24 +105,51 @@ class DedupeSparkJob(basePath: String,
 // Mark all files except the one with latest commits for deletion
 dupeMap.foreach(rt => {
   val (key, rows) = rt
-  var maxCommit = -1L
-
-  rows.foreach(r => {
-val c = r(3).asInstanceOf[String].toLong
-if (c > maxCommit)
-  maxCommit = c
-  })
-
-  rows.foreach(r => {
-val c = r(3).asInstanceOf[String].toLong
-if (c != maxCommit) {
-  val f = r(2).asInstanceOf[String].split("_")(0)
-  if (!fileToDeleteKeyMap.contains(f)) {
-fileToDeleteKeyMap(f) = HashSet[String]()
+
+  if (useCommitTimeForDedupe) {
+/*
+This corresponds to the case where duplicates got created due to 
INSERT and have never been updated.
+ */
+var maxCommit = -1L
+
+rows.foreach(r => {
+  val c = r(3).asInstanceOf[String].toLong
+  if (c > maxCommit)
+maxCommit = c
+})
+rows.foreach(r => {
+  val c = r(3).asInstanceOf[String].toLong
+  if (c != maxCommit) {
+val f = r(2).asInstanceOf[String].split("_")(0)
+if (!fileToDeleteKeyMap.contains(f)) {
+  fileToDeleteKeyMap(f) = HashSet[String]()
+}
+fileToDeleteKeyMap(f).add(key)
   }
-  fileToDeleteKeyMap(f).add(key)
+})
+  } else {
+/*
+This corresponds to the case where duplicates have been updated at 
least once.
+Once updated, duplicates are bound to have same commit time unless 
forcefully modified.
+ */
+val size = rows.size - 1
+var i = 0
+val loop = new Breaks
+loop.breakable {

Review comment:
   Right. Thank you for suggesting this, I am not hands on at scala 
properly. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1522: [HUDI-702]Add test for HoodieLogFileCommand

2020-04-28 Thread GitBox


yanghua commented on a change in pull request #1522:
URL: https://github.com/apache/incubator-hudi/pull/1522#discussion_r416402996



##
File path: 
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
##
@@ -173,7 +176,11 @@ public String showLogFileRecords(
 HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
 FileSystem fs = client.getFs();
 List logFilePaths = Arrays.stream(fs.globStatus(new 
Path(logFilePathPattern)))
-.map(status -> 
status.getPath().toString()).collect(Collectors.toList());
+.map(status -> 
status.getPath().toString()).sorted(Comparator.reverseOrder())

Review comment:
   Do we need to sort this data set?

##
File path: 
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMemoryConfig;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.SchemaTestUtil.getSimpleSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Cases for {@link HoodieLogFileCommand}.
+ */
+public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
+
+  private String partitionPath;
+  private HoodieAvroDataBlock dataBlock;
+  private String tablePath;
+
+  private static final String INSTANT_TIME = "100";
+
+  @Before
+  public void init() throws IOException, InterruptedException, 
URISyntaxException {
+HoodieCLI.conf = jsc.hadoopConfiguration();
+
+// Create table and connect
+String tableName = "test_table";
+tablePath = basePath + File.separator + tableName;
+partitionPath = tablePath + File.separator + 
HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+new TableCommand().createTable(
+tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+"", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+new File(partitionPath).mkdirs();
+
+HoodieLogFormat.Writer writer = null;
+try {
+  writer =
+  HoodieLogFormat.newWriterBuilder().onParentPath(new 
Path(partitionPath))
+  .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+  
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build();
+
+  // write data to file
+  List records = 

[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1558: [HUDI-796]: added deduping logic for upserts case

2020-04-28 Thread GitBox


pratyakshsharma commented on a change in pull request #1558:
URL: https://github.com/apache/incubator-hudi/pull/1558#discussion_r416462293



##
File path: 
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##
@@ -64,11 +64,15 @@ public String deduplicate(
   @CliOption(key = {"repairedOutputPath"}, help = "Location to place the 
repaired files",
   mandatory = true) final String repairedOutputPath,
   @CliOption(key = {"sparkProperties"}, help = "Spark Properties File 
Path",
-  mandatory = true) final String sparkPropertiesPath)
+  mandatory = true) final String sparkPropertiesPath,
+  @CliOption(key = {"useCommitTimeForDedupe"}, help = "Set it to true if 
duplicates have never been updated",
+unspecifiedDefaultValue = "true") final boolean useCommitTimeForDedupe,
+  @CliOption(key = {"dryrun"}, help = "Should we actually add or just 
print what would be done",

Review comment:
   Done. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] lamber-ken commented on a change in pull request #1512: [HUDI-763] Add hoodie.table.base.file.format option to hoodie.properties file

2020-04-28 Thread GitBox


lamber-ken commented on a change in pull request #1512:
URL: https://github.com/apache/incubator-hudi/pull/1512#discussion_r416452010



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##
@@ -142,6 +143,16 @@ object DataSourceWriteOptions {
   val MOR_TABLE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name
   val DEFAULT_TABLE_TYPE_OPT_VAL = COW_TABLE_TYPE_OPT_VAL
 
+  /**
+* The table base file format for the underlying data, for this write.
+* Note that this can't change across writes.
+*
+* Default: PARQUET
+*/
+  val TABLE_FILE_FORMAT_OPT_KEY = "hoodie.table.base.file.format"

Review comment:
   Done, thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1516: [HUDI-784] Adressing issue with log reader on GCS

2020-04-28 Thread GitBox


afilipchik commented on a change in pull request #1516:
URL: https://github.com/apache/incubator-hudi/pull/1516#discussion_r411560192



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##
@@ -79,6 +79,7 @@
   this.inputStream = fsDataInputStream;
 }
 
+fsDataInputStream.seek(0);

Review comment:
   magicBuffer check was failing on file open, as it couldn't find a 
beginning of the HUDI block in the log file. 
   That was throwing exception that was killing compaction. In the debug, it 
appeared that content of magicBuffer was incorrect, and steam offsets were off. 
Only happened when more that 1 files was scheduled to be processed. So, I 
didn't test all the variations originally (non static magicBuffer without seek 
and seek with static) as was in hurry to fix, so not sure which one actually 
fixes the issue. Agree that seek(0) is weird and probably non static is a fix. 
Added comment and additional check..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1518: [HUDI-723] Register avro schema if infered from SQL transformation

2020-04-28 Thread GitBox


afilipchik commented on a change in pull request #1518:
URL: https://github.com/apache/incubator-hudi/pull/1518#discussion_r416381330



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -460,8 +471,17 @@ private void syncHive() {
* this constraint.
*/
   public void setupWriteClient() {
+setupWriteClient(schemaProvider, false);
+  }
+
+  /**
+   * Note that depending on configs and source-type, schemaProvider could 
either be eagerly or lazily created.
+   * SchemaProvider creation is a precursor to HoodieWriteClient and 
AsyncCompactor creation. This method takes care of
+   * this constraint.
+   */
+  private void setupWriteClient(SchemaProvider schemaProvider, boolean 
forceRecreate) {
 LOG.info("Setting up Hoodie Write Client");
-if ((null != schemaProvider) && (null == writeClient)) {
+if (forceRecreate || (null != schemaProvider) && (null == writeClient)) {

Review comment:
   that is a good question. @bvaradar , any suggestions? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (HUDI-843) Support different time units in TimestampBasedKeyGenerator

2020-04-28 Thread Alexander Filipchik (Jira)
Alexander Filipchik created HUDI-843:


 Summary: Support different time units in TimestampBasedKeyGenerator
 Key: HUDI-843
 URL: https://issues.apache.org/jira/browse/HUDI-843
 Project: Apache Hudi (incubating)
  Issue Type: Improvement
Reporter: Alexander Filipchik
 Fix For: 0.6.0


TimestampBasedKeyGenerator only supports UNIX_TIMESTAMP and EPOCHMILLISECONDS 
as a scalar source for partitions. 

 

we should introduce more flexible configuration to allow users to specify other 
time units (days, nanoseconds, ...)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] bhasudha commented on issue #1568: [SUPPORT] java.lang.reflect.InvocationTargetException when upsert

2020-04-28 Thread GitBox


bhasudha commented on issue #1568:
URL: https://github.com/apache/incubator-hudi/issues/1568#issuecomment-620407768


   @tieke1121  are you setting these configs
   ```
 --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
 --hiveconf hive.stats.autogather=false
   ``` 
   when using beeline ? 
   For example - 
https://hudi.apache.org/docs/docker_demo.html#step-4-a-run-hive-queries 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

2020-04-28 Thread GitBox


afilipchik commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r416350367



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder which aims 
at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer 
implements Decoder {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = 
"hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+TypedProperties typedProperties = new TypedProperties();
+copyProperties(typedProperties, properties.props());
+try {
+  SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
   also, in general, it is safer to deserialize avro using both writer's 
and reader's schema

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder which aims 
at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer 
implements Decoder {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = 
"hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+TypedProperties typedProperties = new TypedProperties();
+copyProperties(typedProperties, properties.props());
+try {
+  SchemaProvider schemaProvider = 

[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

2020-04-28 Thread GitBox


afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416344601



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+useSpecificAvroReader = config
+  .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, 
Schema readerSchema) {
+try {
+  ByteBuffer buffer = this.getByteBuffer(payload);
+  int id = buffer.getInt();

Review comment:
   this looks specific to confluent schema registry. Is it a good idea to 
assume all the messages will start with magic int? 

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void