This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f4e3b94 [HUDI-1742] Improve table level config priority for HoodieMultiTableDeltaStreamer (#2744) f4e3b94 is described below commit f4e3b949714aceaf8823fd1659e44d3b7e98089a Author: Nick Young <72905543+nickyoungp...@users.noreply.github.com> AuthorDate: Mon Apr 26 22:05:06 2021 +0800 [HUDI-1742] Improve table level config priority for HoodieMultiTableDeltaStreamer (#2744) --- .../deltastreamer/HoodieMultiTableDeltaStreamer.java | 4 +++- .../utilities/functional/TestHoodieDeltaStreamer.java | 7 +++++++ .../functional/TestHoodieMultiTableDeltaStreamer.java | 18 ++++++++++++++++++ .../short_trip_uber_config.properties | 3 ++- 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index a39b973..8e557f1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -118,7 +118,9 @@ public class HoodieMultiTableDeltaStreamer { checkIfTableConfigFileExists(configFolder, fs, configFilePath); TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig(); properties.forEach((k, v) -> { - tableProperties.setProperty(k.toString(), v.toString()); + if (tableProperties.get(k) == null) { + tableProperties.setProperty(k.toString(), v.toString()); + } }); final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); //copy all the values from config to cfg diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 7d4db2c..362a294 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1631,6 +1631,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } } + public static class TestTableLevelGenerator extends SimpleKeyGenerator { + + public TestTableLevelGenerator(TypedProperties props) { + super(props); + } + } + public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload { public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index 7b5ce9d..17450a0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -213,6 +213,24 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { } } + @Test + public void testTableLevelProperties() throws IOException { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false); + HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); + List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts(); + tableExecutionContexts.forEach(tableExecutionContext -> { + switch (tableExecutionContext.getTableName()) { + case "dummy_table_short_trip": + String tableLevelKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()); + assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), tableLevelKeyGeneratorClass); + break; + default: + String defaultKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()); + assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), defaultKeyGeneratorClass); + } + }); + } + private String populateCommonPropsAndWriteToFile() throws IOException { TypedProperties commonProps = new TypedProperties(); populateCommonProps(commonProps); diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties index 52d39ba..243afc9 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties @@ -21,4 +21,5 @@ hoodie.datasource.write.partitionpath.field=created_at hoodie.deltastreamer.source.kafka.topic=topic2 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S -hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table \ No newline at end of file +hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table +hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator \ No newline at end of file