This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5b53b0d [HUDI-731] Add ChainedTransformer (#1440) 5b53b0d is described below commit 5b53b0d85e0d60a37c37941b5a653b0718534e7b Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Wed Apr 1 08:21:31 2020 -0700 [HUDI-731] Add ChainedTransformer (#1440) * [HUDI-731] Add ChainedTransformer --- .../org/apache/hudi/utilities/UtilHelpers.java | 13 ++- .../hudi/utilities/deltastreamer/DeltaSync.java | 8 +- .../deltastreamer/HoodieDeltaStreamer.java | 21 ++++- .../utilities/transform/ChainedTransformer.java | 54 +++++++++++ .../hudi/utilities/TestHoodieDeltaStreamer.java | 67 +++++++------- .../org/apache/hudi/utilities/TestUtilHelpers.java | 101 +++++++++++++++++++++ .../transform/TestChainedTransformer.java | 92 +++++++++++++++++++ .../{ => transform}/TestFlatteningTransformer.java | 4 +- 8 files changed, 314 insertions(+), 46 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 8930084..222a391 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.Source; +import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; import org.apache.avro.Schema; @@ -67,7 +68,9 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.List; @@ -102,11 +105,15 @@ public class UtilHelpers { } } - public static Transformer createTransformer(String transformerClass) throws IOException { + public static Option<Transformer> createTransformer(List<String> classNames) throws IOException { try { - return transformerClass == null ? null : (Transformer) ReflectionUtils.loadClass(transformerClass); + List<Transformer> transformers = new ArrayList<>(); + for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) { + transformers.add(ReflectionUtils.loadClass(className)); + } + return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers)); } catch (Throwable e) { - throw new IOException("Could not load transformer class " + transformerClass, e); + throw new IOException("Could not load transformer class(es) " + classNames, e); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 99cb497..5cc33ee 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -106,7 +106,7 @@ public class DeltaSync implements Serializable { /** * Allows transforming source to target table before writing. */ - private transient Transformer transformer; + private transient Option<Transformer> transformer; /** * Extract the key for the target table. @@ -173,7 +173,7 @@ public class DeltaSync implements Serializable { refreshTimeline(); - this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName); + this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.keyGenerator = DataSourceUtils.createKeyGenerator(props); this.formatAdapter = new SourceFormatAdapter( @@ -281,14 +281,14 @@ public class DeltaSync implements Serializable { final Option<JavaRDD<GenericRecord>> avroRDDOptional; final String checkpointStr; final SchemaProvider schemaProvider; - if (transformer != null) { + if (transformer.isPresent()) { // Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them // to generic records for writing InputBatch<Dataset<Row>> dataAndCheckpoint = formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit); Option<Dataset<Row>> transformed = - dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props)); + dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) { // If the target schema is specified through Avro schema, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 948033f..bc4c85d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -54,6 +54,7 @@ import org.apache.spark.sql.SparkSession; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -64,6 +65,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import java.util.stream.IntStream; /** @@ -148,6 +150,17 @@ public class HoodieDeltaStreamer implements Serializable { } } + private static class TransformersConverter implements IStringConverter<List<String>> { + + @Override + public List<String> convert(String value) throws ParameterException { + return value == null ? null : Arrays.stream(value.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + } + public static class Config implements Serializable { @Parameter(names = {"--target-base-path"}, @@ -196,11 +209,13 @@ public class HoodieDeltaStreamer implements Serializable { public String schemaProviderClassName = null; @Parameter(names = {"--transformer-class"}, - description = "subclass of org.apache.hudi.utilities.transform.Transformer" + description = "A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer" + ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before " + "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which " - + "allows a SQL query templated to be passed as a transformation function)") - public String transformerClassName = null; + + "allows a SQL query templated to be passed as a transformation function). " + + "Pass a comma-separated list of subclass names to chain the transformations.", + converter = TransformersConverter.class) + public List<String> transformerClassNames = null; @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. " + "Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java new file mode 100644 index 0000000..1161a73 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially. + */ +public class ChainedTransformer implements Transformer { + + private List<Transformer> transformers; + + public ChainedTransformer(List<Transformer> transformers) { + this.transformers = transformers; + } + + public List<String> getTransformersNames() { + return transformers.stream().map(t -> t.getClass().getName()).collect(Collectors.toList()); + } + + @Override + public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { + Dataset<Row> dataset = rowDataset; + for (Transformer t : transformers) { + dataset = t.apply(jsc, sparkSession, dataset, properties); + } + return dataset; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 5ada456..b7323d4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -78,6 +78,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Random; @@ -183,39 +184,39 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static class TestHelpers { static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) { - return makeConfig(basePath, op, DropAllTransformer.class.getName()); + return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName())); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) { - return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName()); + return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName) { - return makeConfig(basePath, op, transformerClassName, PROPS_FILENAME_TEST_SOURCE, false); + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames) { + return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, - String propsFilename, boolean enableHiveSync) { - return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true, - false, null, null); + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames, + String propsFilename, boolean enableHiveSync) { + return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, + false, null, null); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, - String payloadClassName, String tableType) { - return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync, + String payloadClassName, String tableType) { + return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp"); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName, - String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType; cfg.sourceClassName = sourceClassName; - cfg.transformerClassName = transformerClassName; + cfg.transformerClassNames = transformerClassNames; cfg.operation = op; cfg.enableHiveSync = enableHiveSync; cfg.sourceOrderingField = sourceOrderingField; @@ -339,7 +340,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String tableBasePath = dfsBasePath + "/test_table"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, - TripsWithDistanceTransformer.class.getName(), PROPS_FILENAME_TEST_INVALID, false), jsc); + Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc); deltaStreamer.sync(); fail("Should error out when setting the key generator class property to an invalid value"); } catch (IOException e) { @@ -451,7 +452,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Initial bulk insert to ingest to first hudi table HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true); + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext); @@ -524,7 +525,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public void testNullSchemaProvider() throws Exception { String tableBasePath = dfsBasePath + "/test_table"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, false, false, null, null); try { new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); @@ -539,15 +540,15 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public void testPayloadClassUpdate() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, "MERGE_ON_READ"); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext); //now create one more deltaStreamer instance and update payload class cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, - true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ"); + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, + true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ"); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()); //now assert that hoodie.properties file now has updated payload class name @@ -565,14 +566,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public void testPayloadClassUpdateWithCOWTable() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_cow"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, null); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext); //now create one more deltaStreamer instance and update payload class cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), null); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()); @@ -668,12 +669,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); } - private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception { - prepareParquetDFSSource(useSchemaProvider, transformerClassName != null); + private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception { + prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null); String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(), - transformerClassName, PROPS_FILENAME_TEST_PARQUET, false, + transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp"), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); @@ -687,7 +688,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception { - testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName()); + testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } @Test @@ -697,7 +698,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception { - testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName()); + testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } private void prepareCsvDFSSource( @@ -740,14 +741,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } private void testCsvDFSSource( - boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception { - prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null); + boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception { + prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null); String tableBasePath = dfsBasePath + "/test_csv_table" + testNum; String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig( tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(), - transformerClassName, PROPS_FILENAME_TEST_CSV, false, + transformerClassNames, PROPS_FILENAME_TEST_CSV, false, useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); @@ -785,7 +786,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // No schema provider is specified, transformer is applied // In this case, the source schema comes from the inferred schema of the CSV files. // Target schema is determined based on the Dataframe after transformation - testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName()); + testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } @Test @@ -793,7 +794,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // The CSV files have header, the columns are separated by '\t' // File schema provider is used, transformer is applied // In this case, the source and target schema come from the Avro schema files - testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName()); + testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } @Test @@ -824,7 +825,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // No CSV header and no schema provider at the same time are not recommended, // as the transformer behavior may be unexpected try { - testCsvDFSSource(false, '\t', false, TripsWithDistanceTransformer.class.getName()); + testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); fail("Should error out when doing the transformation."); } catch (AnalysisException e) { LOG.error("Expected error during transformation", e); @@ -837,7 +838,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // The CSV files do not have header, the columns are separated by '\t' // File schema provider is used, transformer is applied // In this case, the source and target schema come from the Avro schema files - testCsvDFSSource(false, '\t', true, TripsWithDistanceTransformer.class.getName()); + testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java new file mode 100644 index 0000000..f49e750 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.transform.ChainedTransformer; +import org.apache.hudi.utilities.transform.Transformer; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(Enclosed.class) +public class TestUtilHelpers { + + public static class TestCreateTransformer { + + public static class TransformerFoo implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { + return null; + } + } + + public static class TransformerBar implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { + return null; + } + } + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @Test + public void testCreateTransformerNotPresent() throws IOException { + assertFalse(UtilHelpers.createTransformer(null).isPresent()); + } + + @Test + public void testCreateTransformerLoadOneClass() throws IOException { + Transformer transformer = UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get(); + assertTrue(transformer instanceof ChainedTransformer); + List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames(); + assertEquals(1, transformerNames.size()); + assertEquals(TransformerFoo.class.getName(), transformerNames.get(0)); + } + + @Test + public void testCreateTransformerLoadMultipleClasses() throws IOException { + List<String> classNames = Arrays.asList(TransformerFoo.class.getName(), TransformerBar.class.getName()); + Transformer transformer = UtilHelpers.createTransformer(classNames).get(); + assertTrue(transformer instanceof ChainedTransformer); + List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames(); + assertEquals(2, transformerNames.size()); + assertEquals(TransformerFoo.class.getName(), transformerNames.get(0)); + assertEquals(TransformerBar.class.getName(), transformerNames.get(1)); + } + + @Test + public void testCreateTransformerThrowsException() throws IOException { + exceptionRule.expect(IOException.class); + exceptionRule.expectMessage("Could not load transformer class(es) [foo, bar]"); + UtilHelpers.createTransformer(Arrays.asList("foo", "bar")); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java new file mode 100644 index 0000000..dd5b8b9 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hudi.utilities.UtilHelpers; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.apache.spark.sql.types.DataTypes.createStructField; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TestChainedTransformer { + + private JavaSparkContext jsc; + private SparkSession sparkSession; + + @Before + public void setUp() { + jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]"); + sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); + } + + @After + public void tearDown() { + jsc.stop(); + } + + @Test + public void testChainedTransformation() { + StructType schema = DataTypes.createStructType( + new StructField[] { + createStructField("foo", StringType, false) + }); + Row r1 = RowFactory.create("100"); + Row r2 = RowFactory.create("200"); + Dataset<Row> original = sparkSession.sqlContext().createDataFrame(Arrays.asList(r1, r2), schema); + + Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar"); + Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType)); + ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2)); + Dataset<Row> transformed = transformer.apply(jsc, sparkSession, original, null); + + assertEquals(2, transformed.count()); + assertArrayEquals(new String[] {"bar"}, transformed.columns()); + List<Row> rows = transformed.collectAsList(); + assertEquals(100, rows.get(0).getInt(0)); + assertEquals(200, rows.get(1).getInt(0)); + } + + @Test + public void testGetTransformersNames() { + Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar"); + Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType)); + ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2)); + List<String> classNames = transformer.getTransformersNames(); + assertEquals(t1.getClass().getName(), classNames.get(0)); + assertEquals(t2.getClass().getName(), classNames.get(1)); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java similarity index 95% rename from hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java rename to hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java index d119102..bb95629 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestFlatteningTransformer.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.utilities; - -import org.apache.hudi.utilities.transform.FlatteningTransformer; +package org.apache.hudi.utilities.transform; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata;