This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f4e3b94  [HUDI-1742] Improve table level config priority for 
HoodieMultiTableDeltaStreamer (#2744)
f4e3b94 is described below

commit f4e3b949714aceaf8823fd1659e44d3b7e98089a
Author: Nick Young <72905543+nickyoungp...@users.noreply.github.com>
AuthorDate: Mon Apr 26 22:05:06 2021 +0800

    [HUDI-1742] Improve table level config priority for 
HoodieMultiTableDeltaStreamer (#2744)
---
 .../deltastreamer/HoodieMultiTableDeltaStreamer.java   |  4 +++-
 .../utilities/functional/TestHoodieDeltaStreamer.java  |  7 +++++++
 .../functional/TestHoodieMultiTableDeltaStreamer.java  | 18 ++++++++++++++++++
 .../short_trip_uber_config.properties                  |  3 ++-
 4 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index a39b973..8e557f1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -118,7 +118,9 @@ public class HoodieMultiTableDeltaStreamer {
       checkIfTableConfigFileExists(configFolder, fs, configFilePath);
       TypedProperties tableProperties = UtilHelpers.readConfig(fs, new 
Path(configFilePath), new ArrayList<>()).getConfig();
       properties.forEach((k, v) -> {
-        tableProperties.setProperty(k.toString(), v.toString());
+        if (tableProperties.get(k) == null) {
+          tableProperties.setProperty(k.toString(), v.toString());
+        }
       });
       final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       //copy all the values from config to cfg
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 7d4db2c..362a294 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1631,6 +1631,13 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     }
   }
 
+  public static class TestTableLevelGenerator extends SimpleKeyGenerator {
+
+    public TestTableLevelGenerator(TypedProperties props) {
+      super(props);
+    }
+  }
+
   public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
 
     public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index 7b5ce9d..17450a0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -213,6 +213,24 @@ public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamer {
     }
   }
 
+  @Test
+  public void testTableLevelProperties() throws IOException {
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
TestDataSource.class.getName(), false);
+    HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
+    List<TableExecutionContext> tableExecutionContexts = 
streamer.getTableExecutionContexts();
+    tableExecutionContexts.forEach(tableExecutionContext -> {
+      switch (tableExecutionContext.getTableName()) {
+        case "dummy_table_short_trip":
+          String tableLevelKeyGeneratorClass = 
tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
+          
assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), 
tableLevelKeyGeneratorClass);
+          break;
+        default:
+          String defaultKeyGeneratorClass = 
tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
+          assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), 
defaultKeyGeneratorClass);
+      }
+    });
+  }
+
   private String populateCommonPropsAndWriteToFile() throws IOException {
     TypedProperties commonProps = new TypedProperties();
     populateCommonProps(commonProps);
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
index 52d39ba..243afc9 100644
--- 
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
@@ -21,4 +21,5 @@ hoodie.datasource.write.partitionpath.field=created_at
 hoodie.deltastreamer.source.kafka.topic=topic2
 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
 hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
-hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
\ No newline at end of file
+hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
\ No newline at end of file

Reply via email to