This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 779edc0 [HUDI-344] Add partitioner param to Exporter (#1405) 779edc0 is described below commit 779edc068865898049569da0fe750574f93a0dca Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Wed Mar 18 04:24:04 2020 -0700 [HUDI-344] Add partitioner param to Exporter (#1405) --- .../hudi/utilities/HoodieSnapshotExporter.java | 126 +++++++++++++-------- .../hudi/utilities/TestHoodieSnapshotExporter.java | 110 ++++++++++++++++-- 2 files changed, 178 insertions(+), 58 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index b58b5d3..c39daa7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -18,16 +18,9 @@ package org.apache.hudi.utilities; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.SerializableConfiguration; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; @@ -36,6 +29,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -45,41 +50,66 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.execution.datasources.DataSource; - -import scala.Tuple2; -import scala.collection.JavaConversions; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import scala.Tuple2; +import scala.collection.JavaConversions; + /** * Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files). * * @experimental This export is an experimental tool. If you want to export hudi to hudi, please use HoodieSnapshotCopier. */ public class HoodieSnapshotExporter { + + @FunctionalInterface + public interface Partitioner { + + DataFrameWriter<Row> partition(Dataset<Row> source); + + } + private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class); + public static class OutputFormatValidator implements IValueValidator<String> { + + static final String HUDI = "hudi"; + static final List<String> FORMATS = ImmutableList.of("json", "parquet", HUDI); + + @Override + public void validate(String name, String value) { + if (value == null || !FORMATS.contains(value)) { + throw new ParameterException( + String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS)); + } + } + } + public static class Config implements Serializable { + @Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true) - String sourceBasePath = null; + String sourceBasePath; - @Parameter(names = {"--target-base-path"}, description = "Base path for the target output files (snapshots)", required = true) - String targetOutputPath = null; + @Parameter(names = {"--target-output-path"}, description = "Base path for the target output files (snapshots)", required = true) + String targetOutputPath; - @Parameter(names = {"--output-format"}, description = "e.g. Hudi or Parquet", required = true) + @Parameter(names = {"--output-format"}, description = "Output format for the exported dataset; accept these values: json|parquet|hudi", required = true, + validateValueWith = OutputFormatValidator.class) String outputFormat; @Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning") - String outputPartitionField; + String outputPartitionField = null; + + @Parameter(names = {"--output-partitioner"}, description = "A class to facilitate custom repartitioning") + String outputPartitioner = null; } - public int export(SparkSession spark, Config cfg) throws IOException { + public void export(SparkSession spark, Config cfg) throws IOException { JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); @@ -92,7 +122,7 @@ public class HoodieSnapshotExporter { tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); if (!latestCommit.isPresent()) { LOG.error("No commits present. Nothing to snapshot"); - return -1; + return; } final String latestCommitTimestamp = latestCommit.get().getTimestamp(); LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", @@ -106,28 +136,8 @@ public class HoodieSnapshotExporter { dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList())); } - try { - DataSource.lookupDataSource(cfg.outputFormat, spark.sessionState().conf()); - } catch (Exception e) { - LOG.error(String.format("The %s output format is not supported! ", cfg.outputFormat)); - return -1; - } - if (!cfg.outputFormat.equalsIgnoreCase("hudi")) { - // Do transformation - // A field to do simple Spark repartitioning - DataFrameWriter<Row> write = null; - Dataset<Row> original = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq()); - List<Column> needColumns = Arrays.asList(original.columns()).stream().filter(col -> !col.startsWith("_hoodie_")).map(col -> new Column(col)).collect(Collectors.toList()); - Dataset<Row> reader = original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq()); - if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) { - write = reader.repartition(new Column(cfg.outputPartitionField)) - .write().partitionBy(cfg.outputPartitionField); - } else { - write = reader.write(); - } - write.format(cfg.outputFormat) - .mode(SaveMode.Overwrite) - .save(cfg.targetOutputPath); + if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) { + exportAsNonHudi(spark, cfg, dataFiles); } else { // No transformation is needed for output format "HUDI", just copy the original files. copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf); @@ -136,16 +146,34 @@ public class HoodieSnapshotExporter { } else { LOG.info("The job has 0 partition to copy."); } - return 0; + } + + private void exportAsNonHudi(SparkSession spark, Config cfg, List<String> dataFiles) { + Partitioner defaultPartitioner = dataset -> { + Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq()); + return StringUtils.isNullOrEmpty(cfg.outputPartitionField) + ? hoodieDroppedDataset.write() + : hoodieDroppedDataset.repartition(new Column(cfg.outputPartitionField)).write().partitionBy(cfg.outputPartitionField); + }; + + Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner) + ? defaultPartitioner + : ReflectionUtils.loadClass(cfg.outputPartitioner); + + Dataset<Row> sourceDataset = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq()); + partitioner.partition(sourceDataset) + .format(cfg.outputFormat) + .mode(SaveMode.Overwrite) + .save(cfg.targetOutputPath); } private void copySnapshot(JavaSparkContext jsc, - FileSystem fs, - Config cfg, - List<String> partitions, - List<String> dataFiles, - String latestCommitTimestamp, - SerializableConfiguration serConf) throws IOException { + FileSystem fs, + Config cfg, + List<String> partitions, + List<String> dataFiles, + String latestCommitTimestamp, + SerializableConfiguration serConf) throws IOException { // Make sure the output directory is empty Path outputPath = new Path(cfg.targetOutputPath); if (fs.exists(outputPath)) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java index f624247..6eb15a2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java @@ -29,13 +29,20 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.utilities.HoodieSnapshotExporter.Config; +import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator; +import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner; +import com.beust.jcommander.ParameterException; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.After; import org.junit.Before; @@ -52,6 +59,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @RunWith(Enclosed.class) @@ -62,7 +70,7 @@ public class TestHoodieSnapshotExporter { static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class); static final int NUM_RECORDS = 100; static final String COMMIT_TIME = "20200101000000"; - static final String PARTITION_PATH = "2020/01/01"; + static final String PARTITION_PATH = "2020"; static final String TABLE_NAME = "testing"; String sourcePath; String targetPath; @@ -119,12 +127,19 @@ public class TestHoodieSnapshotExporter { public static class TestHoodieSnapshotExporterForHudi extends ExporterTestHarness { - @Test - public void testExportAsHudi() throws IOException { - HoodieSnapshotExporter.Config cfg = new Config(); + private HoodieSnapshotExporter.Config cfg; + + @Before + public void setUp() throws Exception { + super.setUp(); + cfg = new Config(); cfg.sourceBasePath = sourcePath; cfg.targetOutputPath = targetPath; - cfg.outputFormat = "hudi"; + cfg.outputFormat = OutputFormatValidator.HUDI; + } + + @Test + public void testExportAsHudi() throws IOException { new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); // Check results @@ -151,10 +166,6 @@ public class TestHoodieSnapshotExporter { dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true); // export - HoodieSnapshotExporter.Config cfg = new Config(); - cfg.sourceBasePath = sourcePath; - cfg.targetOutputPath = targetPath; - cfg.outputFormat = "hudi"; new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); // Check results @@ -185,4 +196,85 @@ public class TestHoodieSnapshotExporter { assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); } } + + public static class TestHoodieSnapshotExporterForRepartitioning extends ExporterTestHarness { + + private static final String PARTITION_NAME = "year"; + + public static class UserDefinedPartitioner implements Partitioner { + + @Override + public DataFrameWriter<Row> partition(Dataset<Row> source) { + return source + .withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD, PARTITION_NAME) + .repartition(new Column(PARTITION_NAME)) + .write() + .partitionBy(PARTITION_NAME); + } + } + + private HoodieSnapshotExporter.Config cfg; + + @Before + public void setUp() throws Exception { + super.setUp(); + cfg = new Config(); + cfg.sourceBasePath = sourcePath; + cfg.targetOutputPath = targetPath; + cfg.outputFormat = "json"; + } + + @Test + public void testExportWithPartitionField() throws IOException { + // `driver` field is set in HoodieTestDataGenerator + cfg.outputPartitionField = "driver"; + new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + + assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count()); + assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(dfs.listStatus(new Path(targetPath)).length > 1); + } + + @Test + public void testExportForUserDefinedPartitioner() throws IOException { + cfg.outputPartitioner = UserDefinedPartitioner.class.getName(); + new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + + assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count()); + assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, PARTITION_NAME, PARTITION_PATH)))); + } + } + + @RunWith(Parameterized.class) + public static class TestHoodieSnapshotExporterInputValidation { + + @Parameters + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][] { + {"json", true}, {"parquet", true}, {"hudi", true}, + {"JSON", false}, {"foo", false}, {null, false}, {"", false} + }); + } + + @Parameter + public String format; + @Parameter(1) + public boolean isValid; + + @Test + public void testValidateOutputFormat() { + Throwable t = null; + try { + new OutputFormatValidator().validate(null, format); + } catch (Exception e) { + t = e; + } + if (isValid) { + assertNull(t); + } else { + assertTrue(t instanceof ParameterException); + } + } + } }