This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b53b0d  [HUDI-731] Add ChainedTransformer (#1440)
5b53b0d is described below

commit 5b53b0d85e0d60a37c37941b5a653b0718534e7b
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Wed Apr 1 08:21:31 2020 -0700

    [HUDI-731] Add ChainedTransformer (#1440)
    
    * [HUDI-731] Add ChainedTransformer
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |  13 ++-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   8 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  21 ++++-
 .../utilities/transform/ChainedTransformer.java    |  54 +++++++++++
 .../hudi/utilities/TestHoodieDeltaStreamer.java    |  67 +++++++-------
 .../org/apache/hudi/utilities/TestUtilHelpers.java | 101 +++++++++++++++++++++
 .../transform/TestChainedTransformer.java          |  92 +++++++++++++++++++
 .../{ => transform}/TestFlatteningTransformer.java |   4 +-
 8 files changed, 314 insertions(+), 46 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 8930084..222a391 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
 import org.apache.avro.Schema;
@@ -67,7 +68,9 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -102,11 +105,15 @@ public class UtilHelpers {
     }
   }
 
-  public static Transformer createTransformer(String transformerClass) throws 
IOException {
+  public static Option<Transformer> createTransformer(List<String> classNames) 
throws IOException {
     try {
-      return transformerClass == null ? null : (Transformer) 
ReflectionUtils.loadClass(transformerClass);
+      List<Transformer> transformers = new ArrayList<>();
+      for (String className : 
Option.ofNullable(classNames).orElse(Collections.emptyList())) {
+        transformers.add(ReflectionUtils.loadClass(className));
+      }
+      return transformers.isEmpty() ? Option.empty() : Option.of(new 
ChainedTransformer(transformers));
     } catch (Throwable e) {
-      throw new IOException("Could not load transformer class " + 
transformerClass, e);
+      throw new IOException("Could not load transformer class(es) " + 
classNames, e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 99cb497..5cc33ee 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -106,7 +106,7 @@ public class DeltaSync implements Serializable {
   /**
    * Allows transforming source to target table before writing.
    */
-  private transient Transformer transformer;
+  private transient Option<Transformer> transformer;
 
   /**
    * Extract the key for the target table.
@@ -173,7 +173,7 @@ public class DeltaSync implements Serializable {
 
     refreshTimeline();
 
-    this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
+    this.transformer = 
UtilHelpers.createTransformer(cfg.transformerClassNames);
     this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
 
     this.formatAdapter = new SourceFormatAdapter(
@@ -281,14 +281,14 @@ public class DeltaSync implements Serializable {
     final Option<JavaRDD<GenericRecord>> avroRDDOptional;
     final String checkpointStr;
     final SchemaProvider schemaProvider;
-    if (transformer != null) {
+    if (transformer.isPresent()) {
       // Transformation is needed. Fetch New rows in Row Format, apply 
transformation and then convert them
       // to generic records for writing
       InputBatch<Dataset<Row>> dataAndCheckpoint =
           formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, 
cfg.sourceLimit);
 
       Option<Dataset<Row>> transformed =
-          dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, 
sparkSession, data, props));
+          dataAndCheckpoint.getBatch().map(data -> 
transformer.get().apply(jssc, sparkSession, data, props));
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() 
!= null) {
         // If the target schema is specified through Avro schema,
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 948033f..bc4c85d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -54,6 +54,7 @@ import org.apache.spark.sql.SparkSession;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -64,6 +65,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
@@ -148,6 +150,17 @@ public class HoodieDeltaStreamer implements Serializable {
     }
   }
 
+  private static class TransformersConverter implements 
IStringConverter<List<String>> {
+
+    @Override
+    public List<String> convert(String value) throws ParameterException {
+      return value == null ? null : Arrays.stream(value.split(","))
+          .map(String::trim)
+          .filter(s -> !s.isEmpty())
+          .collect(Collectors.toList());
+    }
+  }
+
   public static class Config implements Serializable {
 
     @Parameter(names = {"--target-base-path"},
@@ -196,11 +209,13 @@ public class HoodieDeltaStreamer implements Serializable {
     public String schemaProviderClassName = null;
 
     @Parameter(names = {"--transformer-class"},
-        description = "subclass of 
org.apache.hudi.utilities.transform.Transformer"
+        description = "A subclass or a list of subclasses of 
org.apache.hudi.utilities.transform.Transformer"
             + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
             + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
-            + "allows a SQL query templated to be passed as a transformation 
function)")
-    public String transformerClassName = null;
+            + "allows a SQL query templated to be passed as a transformation 
function). "
+            + "Pass a comma-separated list of subclass names to chain the 
transformations.",
+        converter = TransformersConverter.class)
+    public List<String> transformerClassNames = null;
 
     @Parameter(names = {"--source-limit"}, description = "Maximum amount of 
data to read from source. "
         + "Default: No limit For e.g: DFS-Source => max bytes to read, 
Kafka-Source => max events to read")
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
new file mode 100644
index 0000000..1161a73
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
+ */
+public class ChainedTransformer implements Transformer {
+
+  private List<Transformer> transformers;
+
+  public ChainedTransformer(List<Transformer> transformers) {
+    this.transformers = transformers;
+  }
+
+  public List<String> getTransformersNames() {
+    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+  }
+
+  @Override
+  public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
+    Dataset<Row> dataset = rowDataset;
+    for (Transformer t : transformers) {
+      dataset = t.apply(jsc, sparkSession, dataset, properties);
+    }
+    return dataset;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 5ada456..b7323d4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -78,6 +78,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
@@ -183,39 +184,39 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   static class TestHelpers {
 
     static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, 
Operation op) {
-      return makeConfig(basePath, op, DropAllTransformer.class.getName());
+      return makeConfig(basePath, op, 
Collections.singletonList(DropAllTransformer.class.getName()));
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op) {
-      return makeConfig(basePath, op, 
TripsWithDistanceTransformer.class.getName());
+      return makeConfig(basePath, op, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
     }
 
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String transformerClassName) {
-      return makeConfig(basePath, op, transformerClassName, 
PROPS_FILENAME_TEST_SOURCE, false);
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, List<String> transformerClassNames) {
+      return makeConfig(basePath, op, transformerClassNames, 
PROPS_FILENAME_TEST_SOURCE, false);
     }
 
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String transformerClassName,
-                                                 String propsFilename, boolean 
enableHiveSync) {
-      return makeConfig(basePath, op, transformerClassName, propsFilename, 
enableHiveSync, true,
-        false, null, null);
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, List<String> transformerClassNames,
+        String propsFilename, boolean enableHiveSync) {
+      return makeConfig(basePath, op, transformerClassNames, propsFilename, 
enableHiveSync, true,
+          false, null, null);
     }
 
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String transformerClassName,
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, List<String> transformerClassNames,
         String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass, boolean updatePayloadClass,
-                                                 String payloadClassName, 
String tableType) {
-      return makeConfig(basePath, op, TestDataSource.class.getName(), 
transformerClassName, propsFilename, enableHiveSync,
+        String payloadClassName, String tableType) {
+      return makeConfig(basePath, op, TestDataSource.class.getName(), 
transformerClassNames, propsFilename, enableHiveSync,
           useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, 
tableType, "timestamp");
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String sourceClassName,
-        String transformerClassName, String propsFilename, boolean 
enableHiveSync, boolean useSchemaProviderClass,
+        List<String> transformerClassNames, String propsFilename, boolean 
enableHiveSync, boolean useSchemaProviderClass,
         int sourceLimit, boolean updatePayloadClass, String payloadClassName, 
String tableType, String sourceOrderingField) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
       cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
       cfg.sourceClassName = sourceClassName;
-      cfg.transformerClassName = transformerClassName;
+      cfg.transformerClassNames = transformerClassNames;
       cfg.operation = op;
       cfg.enableHiveSync = enableHiveSync;
       cfg.sourceOrderingField = sourceOrderingField;
@@ -339,7 +340,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       String tableBasePath = dfsBasePath + "/test_table";
       HoodieDeltaStreamer deltaStreamer =
           new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
Operation.BULK_INSERT,
-              TripsWithDistanceTransformer.class.getName(), 
PROPS_FILENAME_TEST_INVALID, false), jsc);
+              
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), 
PROPS_FILENAME_TEST_INVALID, false), jsc);
       deltaStreamer.sync();
       fail("Should error out when setting the key generator class property to 
an invalid value");
     } catch (IOException e) {
@@ -451,7 +452,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
     // Initial bulk insert to ingest to first hudi table
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
Operation.BULK_INSERT,
-        SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true);
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true);
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", 
sqlContext);
     TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", 
sqlContext);
@@ -524,7 +525,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   public void testNullSchemaProvider() throws Exception {
     String tableBasePath = dfsBasePath + "/test_table";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
Operation.BULK_INSERT,
-        SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true,
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
         false, false, null, null);
     try {
       new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
@@ -539,15 +540,15 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   public void testPayloadClassUpdate() throws Exception {
     String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
Operation.BULK_INSERT,
-        SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true,
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
         true, false, null, "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", 
sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
-      SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true,
-      true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
+        true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
 
     //now assert that hoodie.properties file now has updated payload class name
@@ -565,14 +566,14 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   public void testPayloadClassUpdateWithCOWTable() throws Exception {
     String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
Operation.BULK_INSERT,
-        SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true,
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
         true, false, null, null);
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", 
sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
-        SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true,
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
         true, true, DummyAvroPayload.class.getName(), null);
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
 
@@ -668,12 +669,12 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_PARQUET);
   }
 
-  private void testParquetDFSSource(boolean useSchemaProvider, String 
transformerClassName) throws Exception {
-    prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
+  private void testParquetDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
+    prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
     String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, Operation.INSERT, 
ParquetDFSSource.class.getName(),
-            transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
+            transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
             useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
@@ -687,7 +688,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
   @Test
   public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws 
Exception {
-    testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
+    testParquetDFSSource(false, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
   @Test
@@ -697,7 +698,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
   @Test
   public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws 
Exception {
-    testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
+    testParquetDFSSource(true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
   private void prepareCsvDFSSource(
@@ -740,14 +741,14 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   }
 
   private void testCsvDFSSource(
-      boolean hasHeader, char sep, boolean useSchemaProvider, String 
transformerClassName) throws Exception {
-    prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, 
transformerClassName != null);
+      boolean hasHeader, char sep, boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
+    prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, 
transformerClassNames != null);
     String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
     String sourceOrderingField = (hasHeader || useSchemaProvider) ? 
"timestamp" : "_c0";
     HoodieDeltaStreamer deltaStreamer =
         new HoodieDeltaStreamer(TestHelpers.makeConfig(
             tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
-            transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+            transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
             useSchemaProvider, 1000, false, null, null, sourceOrderingField), 
jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
@@ -785,7 +786,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     // No schema provider is specified, transformer is applied
     // In this case, the source schema comes from the inferred schema of the 
CSV files.
     // Target schema is determined based on the Dataframe after transformation
-    testCsvDFSSource(true, '\t', false, 
TripsWithDistanceTransformer.class.getName());
+    testCsvDFSSource(true, '\t', false, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
   @Test
@@ -793,7 +794,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     // The CSV files have header, the columns are separated by '\t'
     // File schema provider is used, transformer is applied
     // In this case, the source and target schema come from the Avro schema 
files
-    testCsvDFSSource(true, '\t', true, 
TripsWithDistanceTransformer.class.getName());
+    testCsvDFSSource(true, '\t', true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
   @Test
@@ -824,7 +825,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     // No CSV header and no schema provider at the same time are not 
recommended,
     // as the transformer behavior may be unexpected
     try {
-      testCsvDFSSource(false, '\t', false, 
TripsWithDistanceTransformer.class.getName());
+      testCsvDFSSource(false, '\t', false, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
       fail("Should error out when doing the transformation.");
     } catch (AnalysisException e) {
       LOG.error("Expected error during transformation", e);
@@ -837,7 +838,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     // The CSV files do not have header, the columns are separated by '\t'
     // File schema provider is used, transformer is applied
     // In this case, the source and target schema come from the Avro schema 
files
-    testCsvDFSSource(false, '\t', true, 
TripsWithDistanceTransformer.class.getName());
+    testCsvDFSSource(false, '\t', true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
   /**
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
new file mode 100644
index 0000000..f49e750
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.transform.ChainedTransformer;
+import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Enclosed.class)
+public class TestUtilHelpers {
+
+  public static class TestCreateTransformer {
+
+    public static class TransformerFoo implements Transformer {
+
+      @Override
+      public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
+        return null;
+      }
+    }
+
+    public static class TransformerBar implements Transformer {
+
+      @Override
+      public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
+        return null;
+      }
+    }
+
+    @Rule
+    public ExpectedException exceptionRule = ExpectedException.none();
+
+    @Test
+    public void testCreateTransformerNotPresent() throws IOException {
+      assertFalse(UtilHelpers.createTransformer(null).isPresent());
+    }
+
+    @Test
+    public void testCreateTransformerLoadOneClass() throws IOException {
+      Transformer transformer = 
UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get();
+      assertTrue(transformer instanceof ChainedTransformer);
+      List<String> transformerNames = ((ChainedTransformer) 
transformer).getTransformersNames();
+      assertEquals(1, transformerNames.size());
+      assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
+    }
+
+    @Test
+    public void testCreateTransformerLoadMultipleClasses() throws IOException {
+      List<String> classNames = Arrays.asList(TransformerFoo.class.getName(), 
TransformerBar.class.getName());
+      Transformer transformer = 
UtilHelpers.createTransformer(classNames).get();
+      assertTrue(transformer instanceof ChainedTransformer);
+      List<String> transformerNames = ((ChainedTransformer) 
transformer).getTransformersNames();
+      assertEquals(2, transformerNames.size());
+      assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
+      assertEquals(TransformerBar.class.getName(), transformerNames.get(1));
+    }
+
+    @Test
+    public void testCreateTransformerThrowsException() throws IOException {
+      exceptionRule.expect(IOException.class);
+      exceptionRule.expectMessage("Could not load transformer class(es) [foo, 
bar]");
+      UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java
new file mode 100644
index 0000000..dd5b8b9
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.utilities.UtilHelpers;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createStructField;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestChainedTransformer {
+
+  private JavaSparkContext jsc;
+  private SparkSession sparkSession;
+
+  @Before
+  public void setUp() {
+    jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", 
"local[2]");
+    sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
+  }
+
+  @After
+  public void tearDown() {
+    jsc.stop();
+  }
+
+  @Test
+  public void testChainedTransformation() {
+    StructType schema = DataTypes.createStructType(
+        new StructField[] {
+            createStructField("foo", StringType, false)
+        });
+    Row r1 = RowFactory.create("100");
+    Row r2 = RowFactory.create("200");
+    Dataset<Row> original = 
sparkSession.sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
+
+    Transformer t1 = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumnRenamed("foo", "bar");
+    Transformer t2 = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
+    ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, 
t2));
+    Dataset<Row> transformed = transformer.apply(jsc, sparkSession, original, 
null);
+
+    assertEquals(2, transformed.count());
+    assertArrayEquals(new String[] {"bar"}, transformed.columns());
+    List<Row> rows = transformed.collectAsList();
+    assertEquals(100, rows.get(0).getInt(0));
+    assertEquals(200, rows.get(1).getInt(0));
+  }
+
+  @Test
+  public void testGetTransformersNames() {
+    Transformer t1 = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumnRenamed("foo", "bar");
+    Transformer t2 = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
+    ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, 
t2));
+    List<String> classNames = transformer.getTransformersNames();
+    assertEquals(t1.getClass().getName(), classNames.get(0));
+    assertEquals(t2.getClass().getName(), classNames.get(1));
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java
similarity index 95%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
rename to 
hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java
index d119102..bb95629 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.utilities.transform.FlatteningTransformer;
+package org.apache.hudi.utilities.transform;
 
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;

Reply via email to