This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new bc82e2b [HUDI-711] Refactor exporter main logic (#1436) bc82e2b is described below commit bc82e2be6cf080ab99092758368e91f509a2004c Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Wed Mar 25 03:02:24 2020 -0700 [HUDI-711] Refactor exporter main logic (#1436) * Refactor exporter main logic * break main method into multiple readable methods * fix bug of passing wrong file list * avoid deleting output path when exists * throw exception to early abort on multiple cases * use JavaSparkContext instead of SparkSession * improve unit test for expected exceptions --- .../hudi/utilities/HoodieSnapshotExporter.java | 161 +++++++++++---------- .../exception/HoodieSnapshotExporterException.java | 10 ++ .../hudi/utilities/TestHoodieSnapshotExporter.java | 71 +++++++-- 3 files changed, 154 insertions(+), 88 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index dfe3d68..7df630a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -19,18 +19,20 @@ package org.apache.hudi.utilities; import org.apache.hudi.common.SerializableConfiguration; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; -import org.apache.hudi.common.table.TableFileSystemView; +import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException; import com.beust.jcommander.IValueValidator; import com.beust.jcommander.JCommander; @@ -43,19 +45,21 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Column; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; +import java.util.stream.Stream; import scala.Tuple2; import scala.collection.JavaConversions; @@ -109,46 +113,56 @@ public class HoodieSnapshotExporter { String outputPartitioner = null; } - public void export(SparkSession spark, Config cfg) throws IOException { - JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + public void export(JavaSparkContext jsc, Config cfg) throws IOException { FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); - final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration()); - final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath); - final TableFileSystemView.BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata, - tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); - // Get the latest commit - Option<HoodieInstant> latestCommit = - tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); - if (!latestCommit.isPresent()) { - LOG.error("No commits present. Nothing to snapshot"); - return; + if (outputPathExists(fs, cfg)) { + throw new HoodieSnapshotExporterException("The target output path already exists."); } - final String latestCommitTimestamp = latestCommit.get().getTimestamp(); + + final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> { + throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot."); + }); LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); - List<String> partitions = FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false); - if (partitions.size() > 0) { - List<String> dataFiles = new ArrayList<>(); - - for (String partition : partitions) { - dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList())); - } + final List<String> partitions = getPartitions(fs, cfg); + if (partitions.isEmpty()) { + throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot."); + } + LOG.info(String.format("The job needs to export %d partitions.", partitions.size())); - if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) { - exportAsNonHudi(spark, cfg, dataFiles); - } else { - // No transformation is needed for output format "HUDI", just copy the original files. - copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf); - } - createSuccessTag(fs, cfg.targetOutputPath); + if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) { + exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp); } else { - LOG.info("The job has 0 partition to copy."); + exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp); + } + createSuccessTag(fs, cfg); + } + + private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException { + return fs.exists(new Path(cfg.targetOutputPath)); + } + + private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) { + final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath); + Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty(); + } + + private List<String> getPartitions(FileSystem fs, Config cfg) throws IOException { + return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false); + } + + private void createSuccessTag(FileSystem fs, Config cfg) throws IOException { + Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS"); + if (!fs.exists(successTagPath)) { + LOG.info(String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath)); + fs.createNewFile(successTagPath); } } - private void exportAsNonHudi(SparkSession spark, Config cfg, List<String> dataFiles) { + private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) { Partitioner defaultPartitioner = dataset -> { Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq()); return StringUtils.isNullOrEmpty(cfg.outputPartitionField) @@ -160,37 +174,35 @@ public class HoodieSnapshotExporter { ? defaultPartitioner : ReflectionUtils.loadClass(cfg.outputPartitioner); - Dataset<Row> sourceDataset = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq()); + final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); + Iterator<String> exportingFilePaths = jsc + .parallelize(partitions, partitions.size()) + .flatMap(partition -> fsView + .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp) + .map(HoodieBaseFile::getPath).iterator()) + .toLocalIterator(); + + Dataset<Row> sourceDataset = new SQLContext(jsc).read().parquet(JavaConversions.asScalaIterator(exportingFilePaths).toSeq()); partitioner.partition(sourceDataset) .format(cfg.outputFormat) .mode(SaveMode.Overwrite) .save(cfg.targetOutputPath); } - private void copySnapshot(JavaSparkContext jsc, - FileSystem fs, - Config cfg, - List<String> partitions, - List<String> dataFiles, - String latestCommitTimestamp, - SerializableConfiguration serConf) throws IOException { - // Make sure the output directory is empty - Path outputPath = new Path(cfg.targetOutputPath); - if (fs.exists(outputPath)) { - LOG.warn(String.format("The output path %s targetBasePath already exists, deleting", outputPath)); - fs.delete(new Path(cfg.targetOutputPath), true); - } - + private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException { + final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); + final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration()); jsc.parallelize(partitions, partitions.size()).flatMap(partition -> { // Only take latest version files <= latestCommit. - FileSystem fs1 = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); List<Tuple2<String, String>> filePaths = new ArrayList<>(); - dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile))); + Stream<HoodieBaseFile> dataFiles = fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp); + dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath()))); // also need to copy over partition metadata Path partitionMetaFile = new Path(new Path(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); - if (fs1.exists(partitionMetaFile)) { + FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); + if (fs.exists(partitionMetaFile)) { filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString())); } @@ -199,19 +211,20 @@ public class HoodieSnapshotExporter { String partition = tuple._1(); Path sourceFilePath = new Path(tuple._2()); Path toPartitionPath = new Path(cfg.targetOutputPath, partition); - FileSystem ifs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy()); + FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy()); - if (!ifs.exists(toPartitionPath)) { - ifs.mkdirs(toPartitionPath); + if (!fs.exists(toPartitionPath)) { + fs.mkdirs(toPartitionPath); } - FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, sourceFilePath.getName()), false, - ifs.getConf()); + FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false, + fs.getConf()); }); // Also copy the .commit files LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp)); + final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); FileStatus[] commitFilesToCopy = - fs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> { + fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> { if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) { return true; } else { @@ -223,39 +236,37 @@ public class HoodieSnapshotExporter { for (FileStatus commitStatus : commitFilesToCopy) { Path targetFilePath = new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName()); - if (!fs.exists(targetFilePath.getParent())) { - fs.mkdirs(targetFilePath.getParent()); + if (!fileSystem.exists(targetFilePath.getParent())) { + fileSystem.mkdirs(targetFilePath.getParent()); } - if (fs.exists(targetFilePath)) { + if (fileSystem.exists(targetFilePath)) { LOG.error( String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath)); } - FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf()); + FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf()); } } - private void createSuccessTag(FileSystem fs, String targetOutputPath) throws IOException { - Path successTagPath = new Path(targetOutputPath + "/_SUCCESS"); - if (!fs.exists(successTagPath)) { - LOG.info(String.format("Creating _SUCCESS under target output path: %s", targetOutputPath)); - fs.createNewFile(successTagPath); - } + private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) { + FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); + HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath); + return new HoodieTableFileSystemView(tableMetadata, tableMetadata + .getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); } public static void main(String[] args) throws IOException { - // Take input configs final Config cfg = new Config(); new JCommander(cfg, null, args); - // Create a spark job to do the snapshot export - SparkSession spark = SparkSession.builder().appName("Hoodie-snapshot-exporter") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate(); + SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-exporter"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); LOG.info("Initializing spark job."); - HoodieSnapshotExporter hoodieSnapshotExporter = new HoodieSnapshotExporter(); - hoodieSnapshotExporter.export(spark, cfg); - - // Stop the job - spark.stop(); + try { + new HoodieSnapshotExporter().export(jsc, cfg); + } finally { + jsc.stop(); + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSnapshotExporterException.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSnapshotExporterException.java new file mode 100644 index 0000000..6fcb9df --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSnapshotExporterException.java @@ -0,0 +1,10 @@ +package org.apache.hudi.utilities.exception; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieSnapshotExporterException extends HoodieException { + + public HoodieSnapshotExporterException(String msg) { + super(msg); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java index 6eb15a2..730b9ec 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java @@ -31,8 +31,10 @@ import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.utilities.HoodieSnapshotExporter.Config; import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator; import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner; +import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException; import com.beust.jcommander.ParameterException; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -43,11 +45,12 @@ import org.apache.spark.sql.Column; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; @@ -56,9 +59,9 @@ import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -85,7 +88,6 @@ public class TestHoodieSnapshotExporter { sourcePath = dfsBasePath + "/source/"; targetPath = dfsBasePath + "/target/"; dfs.mkdirs(new Path(sourcePath)); - dfs.mkdirs(new Path(targetPath)); HoodieTableMetaClient .initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME, HoodieAvroPayload.class.getName()); @@ -140,7 +142,7 @@ public class TestHoodieSnapshotExporter { @Test public void testExportAsHudi() throws IOException { - new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + new HoodieSnapshotExporter().export(jsc, cfg); // Check results assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean"))); @@ -159,18 +161,61 @@ public class TestHoodieSnapshotExporter { assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata"))); assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); } + } + + public static class TestHoodieSnapshotExporterForEarlyAbort extends ExporterTestHarness { + + private HoodieSnapshotExporter.Config cfg; + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @Before + public void setUp() throws Exception { + super.setUp(); + cfg = new Config(); + cfg.sourceBasePath = sourcePath; + cfg.targetOutputPath = targetPath; + cfg.outputFormat = OutputFormatValidator.HUDI; + } @Test - public void testExportEmptyDataset() throws IOException { + public void testExportWhenTargetPathExists() throws IOException { + // make target output path present + dfs.mkdirs(new Path(targetPath)); + + // export + exceptionRule.expect(HoodieSnapshotExporterException.class); + exceptionRule.expectMessage("The target output path already exists."); + new HoodieSnapshotExporter().export(jsc, cfg); + } + + @Test + public void testExportDatasetWithNoCommit() throws IOException { + // delete commit files + List<Path> commitFiles = Arrays.stream(dfs.listStatus(new Path(sourcePath + "/.hoodie"))) + .map(FileStatus::getPath) + .filter(filePath -> filePath.getName().endsWith(".commit")) + .collect(Collectors.toList()); + for (Path p : commitFiles) { + dfs.delete(p, false); + } + + // export + exceptionRule.expect(HoodieSnapshotExporterException.class); + exceptionRule.expectMessage("No commits present. Nothing to snapshot."); + new HoodieSnapshotExporter().export(jsc, cfg); + } + + @Test + public void testExportDatasetWithNoPartition() throws IOException { // delete all source data dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true); // export - new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); - - // Check results - assertEquals("Target path should be empty.", 0, dfs.listStatus(new Path(targetPath)).length); - assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + exceptionRule.expect(HoodieSnapshotExporterException.class); + exceptionRule.expectMessage("The source dataset has 0 partition to snapshot."); + new HoodieSnapshotExporter().export(jsc, cfg); } } @@ -191,7 +236,7 @@ public class TestHoodieSnapshotExporter { cfg.sourceBasePath = sourcePath; cfg.targetOutputPath = targetPath; cfg.outputFormat = format; - new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + new HoodieSnapshotExporter().export(jsc, cfg); assertEquals(NUM_RECORDS, sqlContext.read().format(format).load(targetPath).count()); assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); } @@ -228,7 +273,7 @@ public class TestHoodieSnapshotExporter { public void testExportWithPartitionField() throws IOException { // `driver` field is set in HoodieTestDataGenerator cfg.outputPartitionField = "driver"; - new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + new HoodieSnapshotExporter().export(jsc, cfg); assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count()); assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); @@ -238,7 +283,7 @@ public class TestHoodieSnapshotExporter { @Test public void testExportForUserDefinedPartitioner() throws IOException { cfg.outputPartitioner = UserDefinedPartitioner.class.getName(); - new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + new HoodieSnapshotExporter().export(jsc, cfg); assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count()); assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));