This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 779edc0  [HUDI-344] Add partitioner param to Exporter (#1405)
779edc0 is described below

commit 779edc068865898049569da0fe750574f93a0dca
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Wed Mar 18 04:24:04 2020 -0700

    [HUDI-344] Add partitioner param to Exporter (#1405)
---
 .../hudi/utilities/HoodieSnapshotExporter.java     | 126 +++++++++++++--------
 .../hudi/utilities/TestHoodieSnapshotExporter.java | 110 ++++++++++++++++--
 2 files changed, 178 insertions(+), 58 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index b58b5d3..c39daa7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -18,16 +18,9 @@
 
 package org.apache.hudi.utilities;
 
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
@@ -36,6 +29,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -45,41 +50,66 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.datasources.DataSource;
-
-import scala.Tuple2;
-import scala.collection.JavaConversions;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+
 /**
  * Export the latest records of Hudi dataset to a set of external files (e.g., 
plain parquet files).
  *
  * @experimental This export is an experimental tool. If you want to export 
hudi to hudi, please use HoodieSnapshotCopier.
  */
 public class HoodieSnapshotExporter {
+
+  @FunctionalInterface
+  public interface Partitioner {
+
+    DataFrameWriter<Row> partition(Dataset<Row> source);
+
+  }
+
   private static final Logger LOG = 
LogManager.getLogger(HoodieSnapshotExporter.class);
 
+  public static class OutputFormatValidator implements IValueValidator<String> 
{
+
+    static final String HUDI = "hudi";
+    static final List<String> FORMATS = ImmutableList.of("json", "parquet", 
HUDI);
+
+    @Override
+    public void validate(String name, String value) {
+      if (value == null || !FORMATS.contains(value)) {
+        throw new ParameterException(
+            String.format("Invalid output format: value:%s: supported 
formats:%s", value, FORMATS));
+      }
+    }
+  }
+
   public static class Config implements Serializable {
+
     @Parameter(names = {"--source-base-path"}, description = "Base path for 
the source Hudi dataset to be snapshotted", required = true)
-    String sourceBasePath = null;
+    String sourceBasePath;
 
-    @Parameter(names = {"--target-base-path"}, description = "Base path for 
the target output files (snapshots)", required = true)
-    String targetOutputPath = null;
+    @Parameter(names = {"--target-output-path"}, description = "Base path for 
the target output files (snapshots)", required = true)
+    String targetOutputPath;
 
-    @Parameter(names = {"--output-format"}, description = "e.g. Hudi or 
Parquet", required = true)
+    @Parameter(names = {"--output-format"}, description = "Output format for 
the exported dataset; accept these values: json|parquet|hudi", required = true,
+        validateValueWith = OutputFormatValidator.class)
     String outputFormat;
 
     @Parameter(names = {"--output-partition-field"}, description = "A field to 
be used by Spark repartitioning")
-    String outputPartitionField;
+    String outputPartitionField = null;
+
+    @Parameter(names = {"--output-partitioner"}, description = "A class to 
facilitate custom repartitioning")
+    String outputPartitioner = null;
   }
 
-  public int export(SparkSession spark, Config cfg) throws IOException {
+  public void export(SparkSession spark, Config cfg) throws IOException {
     JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
     FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, 
jsc.hadoopConfiguration());
 
@@ -92,7 +122,7 @@ public class HoodieSnapshotExporter {
         
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
     if (!latestCommit.isPresent()) {
       LOG.error("No commits present. Nothing to snapshot");
-      return -1;
+      return;
     }
     final String latestCommitTimestamp = latestCommit.get().getTimestamp();
     LOG.info(String.format("Starting to snapshot latest version files which 
are also no-late-than %s.",
@@ -106,28 +136,8 @@ public class HoodieSnapshotExporter {
         dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, 
latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
       }
 
-      try {
-        DataSource.lookupDataSource(cfg.outputFormat, 
spark.sessionState().conf());
-      } catch (Exception e) {
-        LOG.error(String.format("The %s output format is not supported! ", 
cfg.outputFormat));
-        return -1;
-      }
-      if (!cfg.outputFormat.equalsIgnoreCase("hudi")) {
-        // Do transformation
-        // A field to do simple Spark repartitioning
-        DataFrameWriter<Row> write = null;
-        Dataset<Row> original = 
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
-        List<Column> needColumns = 
Arrays.asList(original.columns()).stream().filter(col -> 
!col.startsWith("_hoodie_")).map(col -> new 
Column(col)).collect(Collectors.toList());
-        Dataset<Row> reader = 
original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq());
-        if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) {
-          write = reader.repartition(new Column(cfg.outputPartitionField))
-              .write().partitionBy(cfg.outputPartitionField);
-        } else {
-          write = reader.write();
-        }
-        write.format(cfg.outputFormat)
-            .mode(SaveMode.Overwrite)
-            .save(cfg.targetOutputPath);
+      if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
+        exportAsNonHudi(spark, cfg, dataFiles);
       } else {
         // No transformation is needed for output format "HUDI", just copy the 
original files.
         copySnapshot(jsc, fs, cfg, partitions, dataFiles, 
latestCommitTimestamp, serConf);
@@ -136,16 +146,34 @@ public class HoodieSnapshotExporter {
     } else {
       LOG.info("The job has 0 partition to copy.");
     }
-    return 0;
+  }
+
+  private void exportAsNonHudi(SparkSession spark, Config cfg, List<String> 
dataFiles) {
+    Partitioner defaultPartitioner = dataset -> {
+      Dataset<Row> hoodieDroppedDataset = 
dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
+      return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
+          ? hoodieDroppedDataset.write()
+          : hoodieDroppedDataset.repartition(new 
Column(cfg.outputPartitionField)).write().partitionBy(cfg.outputPartitionField);
+    };
+
+    Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner)
+        ? defaultPartitioner
+        : ReflectionUtils.loadClass(cfg.outputPartitioner);
+
+    Dataset<Row> sourceDataset = 
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
+    partitioner.partition(sourceDataset)
+        .format(cfg.outputFormat)
+        .mode(SaveMode.Overwrite)
+        .save(cfg.targetOutputPath);
   }
 
   private void copySnapshot(JavaSparkContext jsc,
-                            FileSystem fs,
-                            Config cfg,
-                            List<String> partitions,
-                            List<String> dataFiles,
-                            String latestCommitTimestamp,
-                            SerializableConfiguration serConf) throws 
IOException {
+      FileSystem fs,
+      Config cfg,
+      List<String> partitions,
+      List<String> dataFiles,
+      String latestCommitTimestamp,
+      SerializableConfiguration serConf) throws IOException {
     // Make sure the output directory is empty
     Path outputPath = new Path(cfg.targetOutputPath);
     if (fs.exists(outputPath)) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index f624247..6eb15a2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -29,13 +29,20 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
+import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
+import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
 
+import com.beust.jcommander.ParameterException;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.After;
 import org.junit.Before;
@@ -52,6 +59,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Enclosed.class)
@@ -62,7 +70,7 @@ public class TestHoodieSnapshotExporter {
     static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
     static final int NUM_RECORDS = 100;
     static final String COMMIT_TIME = "20200101000000";
-    static final String PARTITION_PATH = "2020/01/01";
+    static final String PARTITION_PATH = "2020";
     static final String TABLE_NAME = "testing";
     String sourcePath;
     String targetPath;
@@ -119,12 +127,19 @@ public class TestHoodieSnapshotExporter {
 
   public static class TestHoodieSnapshotExporterForHudi extends 
ExporterTestHarness {
 
-    @Test
-    public void testExportAsHudi() throws IOException {
-      HoodieSnapshotExporter.Config cfg = new Config();
+    private HoodieSnapshotExporter.Config cfg;
+
+    @Before
+    public void setUp() throws Exception {
+      super.setUp();
+      cfg = new Config();
       cfg.sourceBasePath = sourcePath;
       cfg.targetOutputPath = targetPath;
-      cfg.outputFormat = "hudi";
+      cfg.outputFormat = OutputFormatValidator.HUDI;
+    }
+
+    @Test
+    public void testExportAsHudi() throws IOException {
       new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
 
       // Check results
@@ -151,10 +166,6 @@ public class TestHoodieSnapshotExporter {
       dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
 
       // export
-      HoodieSnapshotExporter.Config cfg = new Config();
-      cfg.sourceBasePath = sourcePath;
-      cfg.targetOutputPath = targetPath;
-      cfg.outputFormat = "hudi";
       new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
 
       // Check results
@@ -185,4 +196,85 @@ public class TestHoodieSnapshotExporter {
       assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
     }
   }
+
+  public static class TestHoodieSnapshotExporterForRepartitioning extends 
ExporterTestHarness {
+
+    private static final String PARTITION_NAME = "year";
+
+    public static class UserDefinedPartitioner implements Partitioner {
+
+      @Override
+      public DataFrameWriter<Row> partition(Dataset<Row> source) {
+        return source
+            .withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
PARTITION_NAME)
+            .repartition(new Column(PARTITION_NAME))
+            .write()
+            .partitionBy(PARTITION_NAME);
+      }
+    }
+
+    private HoodieSnapshotExporter.Config cfg;
+
+    @Before
+    public void setUp() throws Exception {
+      super.setUp();
+      cfg = new Config();
+      cfg.sourceBasePath = sourcePath;
+      cfg.targetOutputPath = targetPath;
+      cfg.outputFormat = "json";
+    }
+
+    @Test
+    public void testExportWithPartitionField() throws IOException {
+      // `driver` field is set in HoodieTestDataGenerator
+      cfg.outputPartitionField = "driver";
+      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+
+      assertEquals(NUM_RECORDS, 
sqlContext.read().format("json").load(targetPath).count());
+      assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+      assertTrue(dfs.listStatus(new Path(targetPath)).length > 1);
+    }
+
+    @Test
+    public void testExportForUserDefinedPartitioner() throws IOException {
+      cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
+      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+
+      assertEquals(NUM_RECORDS, 
sqlContext.read().format("json").load(targetPath).count());
+      assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+      assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, 
PARTITION_NAME, PARTITION_PATH))));
+    }
+  }
+
+  @RunWith(Parameterized.class)
+  public static class TestHoodieSnapshotExporterInputValidation {
+
+    @Parameters
+    public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][] {
+          {"json", true}, {"parquet", true}, {"hudi", true},
+          {"JSON", false}, {"foo", false}, {null, false}, {"", false}
+      });
+    }
+
+    @Parameter
+    public String format;
+    @Parameter(1)
+    public boolean isValid;
+
+    @Test
+    public void testValidateOutputFormat() {
+      Throwable t = null;
+      try {
+        new OutputFormatValidator().validate(null, format);
+      } catch (Exception e) {
+        t = e;
+      }
+      if (isValid) {
+        assertNull(t);
+      } else {
+        assertTrue(t instanceof ParameterException);
+      }
+    }
+  }
 }

Reply via email to