This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8e434b816dc [HUDI-6832] Ensure other table services with correct path are not affected by table with wrong path. (#9652) 8e434b816dc is described below commit 8e434b816dc81dd6902699ac972ef83f0d307b02 Author: majian <47964462+majian1...@users.noreply.github.com> AuthorDate: Mon Oct 9 19:10:01 2023 +0800 [HUDI-6832] Ensure other table services with correct path are not affected by table with wrong path. (#9652) --- .../multitable/HoodieMultiTableServicesMain.java | 2 +- .../multitable/MultiTableServiceUtils.java | 34 +++++++++++++++- .../TestHoodieMultiTableServicesMain.java | 46 ++++++++++++++++++++++ 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java index d745a502e04..10258876fbe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java @@ -116,7 +116,7 @@ public class HoodieMultiTableServicesMain { .flatMap(p -> MultiTableServiceUtils.findHoodieTablesUnderPath(jsc, p).stream()) .collect(Collectors.toList()); } else { - tablePaths = MultiTableServiceUtils.getTablesToBeServedFromProps(props); + tablePaths = MultiTableServiceUtils.getTablesToBeServedFromProps(jsc, props); } LOG.info("All table paths: " + String.join(",", tablePaths)); if (cfg.batch) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java index 8188cd2c703..f9a761ea6b8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java @@ -24,12 +24,15 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -44,22 +47,49 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME * Utils for executing multi-table services. */ public class MultiTableServiceUtils { + private static final Logger LOG = LoggerFactory.getLogger(MultiTableServiceUtils.class); public static class Constants { public static final String TABLES_TO_BE_SERVED_PROP = "hoodie.tableservice.tablesToServe"; + public static final String TABLES_SKIP_WRONG_PATH = "hoodie.tableservice.skipNonHudiTable"; + public static final String COMMA_SEPARATOR = ","; private static final int DEFAULT_LISTING_PARALLELISM = 1500; } - public static List<String> getTablesToBeServedFromProps(TypedProperties properties) { + public static List<String> getTablesToBeServedFromProps(JavaSparkContext jsc, TypedProperties properties) { + SerializableConfiguration conf = new SerializableConfiguration(jsc.hadoopConfiguration()); String combinedTablesString = properties.getString(Constants.TABLES_TO_BE_SERVED_PROP); + boolean skipWrongPath = properties.getBoolean(Constants.TABLES_SKIP_WRONG_PATH, false); if (combinedTablesString == null) { return new ArrayList<>(); } String[] tablesArray = combinedTablesString.split(Constants.COMMA_SEPARATOR); - return Arrays.asList(tablesArray); + + List<String> tablePaths; + if (skipWrongPath) { + tablePaths = Arrays.stream(tablesArray) + .filter(tablePath -> { + if (isHoodieTable(new Path(tablePath), conf.get())) { + return true; + } else { + // Log the wrong path in console. + LOG.warn("Hoodie table not found in path {}, skip", tablePath); + return false; + } + }).collect(Collectors.toList()); + } else { + tablePaths = Arrays.asList(tablesArray); + tablePaths.stream() + .filter(tablePath -> !isHoodieTable(new Path(tablePath), conf.get())) + .findFirst() + .ifPresent(tablePath -> { + throw new TableNotFoundException("Table not found: " + tablePath); + }); + } + return tablePaths; } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java index ac5ca743e85..3b47ea82db3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java @@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieLayoutConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner; @@ -68,6 +69,8 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.utilities.multitable.MultiTableServiceUtils.Constants.TABLES_SKIP_WRONG_PATH; +import static org.apache.hudi.utilities.multitable.MultiTableServiceUtils.Constants.TABLES_TO_BE_SERVED_PROP; /** * Tests for HoodieMultiTableServicesMain @@ -166,6 +169,49 @@ class TestHoodieMultiTableServicesMain extends HoodieCommonTestHarness implement Assertions.assertEquals(4, metaClient2.reloadActiveTimeline().getCommitsTimeline().countInstants()); } + @Test + public void testRunMultiTableServicesWithOneWrongPath() throws IOException { + // batch run table service + HoodieMultiTableServicesMain.Config cfg = getHoodieMultiServiceConfig(); + cfg.autoDiscovery = false; + cfg.batch = true; + HoodieTableMetaClient metaClient1 = getMetaClient("table1"); + cfg.configs.add(String.format("%s=%s", TABLES_SKIP_WRONG_PATH, "true")); + cfg.configs.add(String.format("%s=%s", TABLES_TO_BE_SERVED_PROP, metaClient1.getBasePathV2() + ",file:///fakepath")); + HoodieMultiTableServicesMain main = new HoodieMultiTableServicesMain(jsc, cfg); + try { + main.startServices(); + } catch (Exception e) { + Assertions.assertFalse(e instanceof TableNotFoundException); + } + + // stream run table service + cfg.batch = false; + new Thread(() -> { + try { + Thread.sleep(10000); + LOG.info("Shutdown the table services"); + main.cancel(); + } catch (InterruptedException e) { + LOG.warn("InterruptedException: ", e); + } + }).start(); + try { + main.startServices(); + } catch (Exception e) { + Assertions.assertFalse(e instanceof TableNotFoundException); + } + + // When we disable the skip wrong path, throw the exception. + cfg.batch = true; + cfg.configs.add(String.format("%s=%s", TABLES_SKIP_WRONG_PATH, "false")); + try { + main.startServices(); + } catch (Exception e) { + Assertions.assertTrue(e instanceof TableNotFoundException); + } + } + private void prepareData() throws IOException { initTestDataGenerator(); HoodieTableMetaClient metaClient1 = getMetaClient("table1");