This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 62b392b  [HUDI-1343] Add standard schema postprocessor which would 
rewrite the schema using spark-avro conversion (#2192)
62b392b is described below

commit 62b392b49c13455199e0372204dedf8a371b452c
Author: liujinhui <965147...@qq.com>
AuthorDate: Fri Dec 4 11:28:34 2020 +0800

    [HUDI-1343] Add standard schema postprocessor which would rewrite the 
schema using spark-avro conversion (#2192)
    
    Co-authored-by: liujh <li...@t3go.cn>
---
 .../org/apache/hudi/utilities/UtilHelpers.java     | 15 +++++--
 .../deltastreamer/HoodieDeltaStreamer.java         |  2 +-
 .../utilities/schema/SparkAvroPostProcessor.java   | 47 ++++++++++++++++++++++
 .../hudi/utilities/SparkAvroSchemaProvider.java    | 36 +++++++++++++++++
 .../hudi/utilities/TestSchemaPostProcessor.java    | 38 ++++++++++++++++-
 5 files changed, 133 insertions(+), 5 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 45171b3..fb42775 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -37,11 +37,12 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
 import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
-import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
+import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
 import org.apache.hudi.utilities.sources.Source;
@@ -400,7 +401,7 @@ public class UtilHelpers {
   }
 
   public static SchemaProviderWithPostProcessor 
wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
-      TypedProperties cfg, JavaSparkContext jssc) {
+      TypedProperties cfg, JavaSparkContext jssc, List<String> 
transformerClassNames) {
 
     if (provider == null) {
       return null;
@@ -409,7 +410,15 @@ public class UtilHelpers {
     if (provider instanceof  SchemaProviderWithPostProcessor) {
       return (SchemaProviderWithPostProcessor)provider;
     }
+
     String schemaPostProcessorClass = 
cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
+    boolean enableSparkAvroPostProcessor = 
Boolean.valueOf(cfg.getString(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE,
 "true"));
+
+    if (transformerClassNames != null && !transformerClassNames.isEmpty()
+            && enableSparkAvroPostProcessor && 
StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
+      schemaPostProcessorClass = SparkAvroPostProcessor.class.getName();
+    }
+
     return new SchemaProviderWithPostProcessor(provider,
         Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, 
cfg, jssc)));
   }
@@ -417,6 +426,6 @@ public class UtilHelpers {
   public static SchemaProvider createRowBasedSchemaProvider(StructType 
structType,
       TypedProperties cfg, JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
-    return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc);
+    return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, 
null);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 6d25f46..10d9453 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -555,7 +555,7 @@ public class HoodieDeltaStreamer implements Serializable {
       this.props = properties.get();
       LOG.info("Creating delta streamer with configs : " + props.toString());
       this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
-          UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, 
jssc), props, jssc);
+          UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, 
jssc), props, jssc, cfg.transformerClassNames);
 
       deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, 
jssc, fs, conf,
           this::onInitializingWriteClient);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
new file mode 100644
index 0000000..9f71a7f
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * HUDI-1343:Add standard schema postprocessor which would rewrite the schema 
using spark-avro conversion.
+ */
+public class SparkAvroPostProcessor extends SchemaPostProcessor {
+
+  public static class Config {
+    public static final String SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE =
+            
"hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable";
+  }
+
+  public SparkAvroPostProcessor(TypedProperties props, JavaSparkContext jssc) {
+    super(props, jssc);
+  }
+
+  @Override
+  public Schema processSchema(Schema schema) {
+    return AvroConversionUtils.convertStructTypeToAvroSchema(
+        AvroConversionUtils.convertAvroSchemaToStructType(schema), 
RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, 
+        RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java
new file mode 100644
index 0000000..90c0149
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkAvroSchemaProvider extends SchemaProvider {
+
+  public SparkAvroSchemaProvider(TypedProperties props, JavaSparkContext jssc) 
{
+    super(props, jssc);
+  }
+
+  @Override
+  public Schema getSourceSchema() {
+    return new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":
 \"day\", \"type\":\"string\"}]}");
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index 39b8457..725743d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -22,15 +22,20 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.SchemaBuilder;
+
+import org.apache.hudi.utilities.transform.FlatteningTransformer;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -39,13 +44,19 @@ public class TestSchemaPostProcessor extends 
UtilitiesTestBase {
 
   private TypedProperties properties = new TypedProperties();
 
+  private static String ORIGINAL_SCHEMA = 
"{\"name\":\"t3_biz_operation_t_driver\",\"type\":\"record\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],\"default\":null},"
+                                              + 
"{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
+
+  private static String RESULT_SCHEMA = 
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"string\",\"null\"]},"
+                                            + 
"{\"name\":\"ums_ts_\",\"type\":[\"string\",\"null\"]}]}";
+
   @Test
   public void testPostProcessor() throws IOException {
     properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, 
DummySchemaPostProcessor.class.getName());
     SchemaProvider provider =
         UtilHelpers.wrapSchemaProviderWithPostProcessor(
         UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), 
properties, jsc),
-            properties, jsc);
+            properties, jsc,null);
 
     Schema schema = provider.getSourceSchema();
     assertEquals(schema.getType(), Type.RECORD);
@@ -53,6 +64,31 @@ public class TestSchemaPostProcessor extends 
UtilitiesTestBase {
     assertNotNull(schema.getField("testString"));
   }
 
+  @Test
+  public void testSparkAvro() throws IOException {
+    properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, 
SparkAvroPostProcessor.class.getName());
+    List<String> transformerClassNames = new ArrayList<>();
+    transformerClassNames.add(FlatteningTransformer.class.getName());
+
+    SchemaProvider provider =
+            UtilHelpers.wrapSchemaProviderWithPostProcessor(
+                    
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), 
properties, jsc),
+                    properties, jsc, transformerClassNames);
+
+    Schema schema = provider.getSourceSchema();
+    assertEquals(schema.getType(), Type.RECORD);
+    assertEquals(schema.getName(), "hoodie_source");
+    assertEquals(schema.getNamespace(), "hoodie.source");
+    assertNotNull(schema.getField("day"));
+  }
+
+  @Test
+  public void testSparkAvroSchema() throws IOException {
+    SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, 
null);
+    Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
+    assertEquals(processor.processSchema(schema).toString(), RESULT_SCHEMA);
+  }
+
   public static class DummySchemaPostProcessor extends SchemaPostProcessor {
 
     public DummySchemaPostProcessor(TypedProperties props, JavaSparkContext 
jssc) {

Reply via email to