SureshK-T2S opened a new issue #2406: URL: https://github.com/apache/hudi/issues/2406
I am attempting to create a hudi table using a parquet file on S3. The motivation for this approach is based on this Hudi blog: https://cwiki.apache.org/confluence/display/HUDI/2020/01/20/Change+Capture+Using+AWS+Database+Migration+Service+and+Hudi To first attempt usage of deltastreamer to ingest a full initial batch load, I attempted to use parquet files used in an aws blog at s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/ https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/ At first I used the spark shell on EMR to load the data into a dataframe and view it, this happens with no issues: ![image](https://user-images.githubusercontent.com/17935082/103654795-59783d00-4f8c-11eb-86bd-a1f0cd7db3f3.png) I then attempted to use Hudi Deltastreamer as per my understanding of the documentation, however I ran into a couple of issues. Steps to reproduce the behavior: 1. Ran the following: ``` spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\ --master yarn --deploy-mode client \ /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \ --source-ordering-field request_timestamp \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://mysqlcdc-stream-prod/hudi_tryout/hudi_aws_test --target-table hudi_aws_test \ --hoodie-conf hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP ``` Stacktrace: ```Exception in thread "main" java.io.IOException: Could not load key generator class org.apache.hudi.keygen.SimpleKeyGenerator at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:94) at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:190) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:552) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:99) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98) at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:92) ... 17 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87) ... 19 more Caused by: java.lang.IllegalArgumentException: Property hoodie.datasource.write.partitionpath.field not found at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:42) at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:47) at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:36) ... 24 more ``` 2. I understand that for a timestamp based partition field it is recommended to use a CustomKeyGenerator: ```spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\ --master yarn --deploy-mode client \ /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \ --source-ordering-field request_timestamp \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://mysqlcdc-stream-prod/hudi_tryout/hudi_aws_test --target-table hudi_aws_test \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP``` This gives rise to a different error: ```Exception in thread "main" java.io.IOException: Could not load key generator class org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:94) at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:190) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:552) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:99) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hudi.exception.HoodieException: Unable to load class at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:56) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98) at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:92) ... 17 more Caused by: java.lang.ClassNotFoundException: org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:53) ... 20 more``` **Expected behavior** I've clearly specified the partition path field in hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP. However this consistently fails for me even on other parquet files. I assumed the problem might be that it needs to be added in the dfs-source.properties file, so I'd added the following to that file: ```include=base.properties hoodie.datasource.write.recordkey.field=request_timestamp hoodie.datasource.write.partitionpath.field=request_timestamp``` However that didn't fix anything. I also added the location of the file under --props, however it couldn't find the file even though I am able to display the contents of the file in terminal using cat. Suspecting the choice of key generator being the issue, I tried several other partitioners including Custom, Complex and TimeBased. However it wasn't able to load class for any of them. Please let me know if I am doing anything wrong here. **Environment Description** * Hudi version : 0.6.0 * Spark version : version 2.4.7-amzn-0 Using Scala version 2.11.12 * Hive version : * Hadoop version : Hadoop 2.10.1-amzn-0 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org