SureshK-T2S opened a new issue #2406:
URL: https://github.com/apache/hudi/issues/2406


   I am attempting to create a hudi table using a parquet file on S3. The 
motivation for this approach is based on this Hudi blog: 
   
https://cwiki.apache.org/confluence/display/HUDI/2020/01/20/Change+Capture+Using+AWS+Database+Migration+Service+and+Hudi
   
   To first attempt usage of deltastreamer to ingest a full initial batch load, 
I attempted to use parquet files used in an aws blog at 
s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/
   
https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/
   
   At first I used the spark shell on EMR to load the data into a dataframe and 
view it, this happens with no issues:
   
   
![image](https://user-images.githubusercontent.com/17935082/103654795-59783d00-4f8c-11eb-86bd-a1f0cd7db3f3.png)
   
   I then attempted to use Hudi Deltastreamer as per my understanding of the 
documentation, however I ran into a couple of issues.
   
   
   Steps to reproduce the behavior:
   
   1. Ran the following:
   ```
   spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
     --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
     --master yarn --deploy-mode client \
   /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
     --source-ordering-field request_timestamp \
     --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
     --target-base-path s3://mysqlcdc-stream-prod/hudi_tryout/hudi_aws_test 
--target-table hudi_aws_test \
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP
   ```
   Stacktrace:
   ```Exception in thread "main" java.io.IOException: Could not load key 
generator class org.apache.hudi.keygen.SimpleKeyGenerator
        at 
org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:94)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:190)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:552)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:99)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate 
class 
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98)
        at 
org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:92)
        ... 17 more
   Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87)
        ... 19 more
   Caused by: java.lang.IllegalArgumentException: Property 
hoodie.datasource.write.partitionpath.field not found
        at 
org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:42)
        at 
org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:47)
        at 
org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:36)
        ... 24 more
   ```
   
   2. I understand that for a timestamp based partition field it is recommended 
to use a CustomKeyGenerator:
   ```spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
     --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
     --master yarn --deploy-mode client \
   /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
     --source-ordering-field request_timestamp \
     --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
     --target-base-path s3://mysqlcdc-stream-prod/hudi_tryout/hudi_aws_test 
--target-table hudi_aws_test \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP```
   
   This gives rise to a different error:
   ```Exception in thread "main" java.io.IOException: Could not load key 
generator class 
org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP
        at 
org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:94)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:190)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:552)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:99)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: org.apache.hudi.exception.HoodieException: Unable to load class
        at 
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:56)
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87)
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98)
        at 
org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:92)
        ... 17 more
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at 
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:53)
        ... 20 more```
   
   
   **Expected behavior**
   I've clearly specified the partition path field in 
hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP. 
However this consistently fails for me even on other parquet files. I assumed 
the problem might be that it needs to be added in the dfs-source.properties 
file, so I'd added the following to that file:
   ```include=base.properties
   hoodie.datasource.write.recordkey.field=request_timestamp
   hoodie.datasource.write.partitionpath.field=request_timestamp```
   However that didn't fix anything. I also added the location of the file 
under --props, however it couldn't find the file even though I am able to 
display the contents of the file in terminal using cat.
   
   Suspecting the choice of key generator being the issue, I tried several 
other partitioners including Custom, Complex and TimeBased. However it wasn't 
able to load class for any of them.
   
   Please let me know if I am doing anything wrong here.
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : version 2.4.7-amzn-0 Using Scala version 2.11.12
   
   * Hive version :
   
   * Hadoop version : Hadoop 2.10.1-amzn-0
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to