This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc82e2b  [HUDI-711] Refactor exporter main logic (#1436)
bc82e2b is described below

commit bc82e2be6cf080ab99092758368e91f509a2004c
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Wed Mar 25 03:02:24 2020 -0700

    [HUDI-711] Refactor exporter main logic (#1436)
    
    * Refactor exporter main logic
    * break main method into multiple readable methods
    * fix bug of passing wrong file list
    * avoid deleting output path when exists
    * throw exception to early abort on multiple cases
    * use JavaSparkContext instead of SparkSession
    * improve unit test for expected exceptions
---
 .../hudi/utilities/HoodieSnapshotExporter.java     | 161 +++++++++++----------
 .../exception/HoodieSnapshotExporterException.java |  10 ++
 .../hudi/utilities/TestHoodieSnapshotExporter.java |  71 +++++++--
 3 files changed, 154 insertions(+), 88 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index dfe3d68..7df630a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -19,18 +19,20 @@
 package org.apache.hudi.utilities;
 
 import org.apache.hudi.common.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
-import org.apache.hudi.common.table.TableFileSystemView;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
 
 import com.beust.jcommander.IValueValidator;
 import com.beust.jcommander.JCommander;
@@ -43,19 +45,21 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import scala.Tuple2;
 import scala.collection.JavaConversions;
@@ -109,46 +113,56 @@ public class HoodieSnapshotExporter {
     String outputPartitioner = null;
   }
 
-  public void export(SparkSession spark, Config cfg) throws IOException {
-    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+  public void export(JavaSparkContext jsc, Config cfg) throws IOException {
     FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, 
jsc.hadoopConfiguration());
 
-    final SerializableConfiguration serConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
-    final HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
-    final TableFileSystemView.BaseFileOnlyView fsView = new 
HoodieTableFileSystemView(tableMetadata,
-        
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
-    // Get the latest commit
-    Option<HoodieInstant> latestCommit =
-        
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
-    if (!latestCommit.isPresent()) {
-      LOG.error("No commits present. Nothing to snapshot");
-      return;
+    if (outputPathExists(fs, cfg)) {
+      throw new HoodieSnapshotExporterException("The target output path 
already exists.");
     }
-    final String latestCommitTimestamp = latestCommit.get().getTimestamp();
+
+    final String latestCommitTimestamp = getLatestCommitTimestamp(fs, 
cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
+      throw new HoodieSnapshotExporterException("No commits present. Nothing 
to snapshot.");
+    });
     LOG.info(String.format("Starting to snapshot latest version files which 
are also no-late-than %s.",
         latestCommitTimestamp));
 
-    List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
cfg.sourceBasePath, false);
-    if (partitions.size() > 0) {
-      List<String> dataFiles = new ArrayList<>();
-
-      for (String partition : partitions) {
-        dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, 
latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
-      }
+    final List<String> partitions = getPartitions(fs, cfg);
+    if (partitions.isEmpty()) {
+      throw new HoodieSnapshotExporterException("The source dataset has 0 
partition to snapshot.");
+    }
+    LOG.info(String.format("The job needs to export %d partitions.", 
partitions.size()));
 
-      if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
-        exportAsNonHudi(spark, cfg, dataFiles);
-      } else {
-        // No transformation is needed for output format "HUDI", just copy the 
original files.
-        copySnapshot(jsc, fs, cfg, partitions, dataFiles, 
latestCommitTimestamp, serConf);
-      }
-      createSuccessTag(fs, cfg.targetOutputPath);
+    if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
+      exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
     } else {
-      LOG.info("The job has 0 partition to copy.");
+      exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
+    }
+    createSuccessTag(fs, cfg);
+  }
+
+  private boolean outputPathExists(FileSystem fs, Config cfg) throws 
IOException {
+    return fs.exists(new Path(cfg.targetOutputPath));
+  }
+
+  private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
+    final HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+    Option<HoodieInstant> latestCommit = 
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+    return latestCommit.isPresent() ? 
Option.of(latestCommit.get().getTimestamp()) : Option.empty();
+  }
+
+  private List<String> getPartitions(FileSystem fs, Config cfg) throws 
IOException {
+    return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
+  }
+
+  private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
+    Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
+    if (!fs.exists(successTagPath)) {
+      LOG.info(String.format("Creating _SUCCESS under target output path: %s", 
cfg.targetOutputPath));
+      fs.createNewFile(successTagPath);
     }
   }
 
-  private void exportAsNonHudi(SparkSession spark, Config cfg, List<String> 
dataFiles) {
+  private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> 
partitions, String latestCommitTimestamp) {
     Partitioner defaultPartitioner = dataset -> {
       Dataset<Row> hoodieDroppedDataset = 
dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
       return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
@@ -160,37 +174,35 @@ public class HoodieSnapshotExporter {
         ? defaultPartitioner
         : ReflectionUtils.loadClass(cfg.outputPartitioner);
 
-    Dataset<Row> sourceDataset = 
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
+    final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+    Iterator<String> exportingFilePaths = jsc
+        .parallelize(partitions, partitions.size())
+        .flatMap(partition -> fsView
+            .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
+            .map(HoodieBaseFile::getPath).iterator())
+        .toLocalIterator();
+
+    Dataset<Row> sourceDataset = new 
SQLContext(jsc).read().parquet(JavaConversions.asScalaIterator(exportingFilePaths).toSeq());
     partitioner.partition(sourceDataset)
         .format(cfg.outputFormat)
         .mode(SaveMode.Overwrite)
         .save(cfg.targetOutputPath);
   }
 
-  private void copySnapshot(JavaSparkContext jsc,
-      FileSystem fs,
-      Config cfg,
-      List<String> partitions,
-      List<String> dataFiles,
-      String latestCommitTimestamp,
-      SerializableConfiguration serConf) throws IOException {
-    // Make sure the output directory is empty
-    Path outputPath = new Path(cfg.targetOutputPath);
-    if (fs.exists(outputPath)) {
-      LOG.warn(String.format("The output path %s targetBasePath already 
exists, deleting", outputPath));
-      fs.delete(new Path(cfg.targetOutputPath), true);
-    }
-
+  private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> 
partitions, String latestCommitTimestamp) throws IOException {
+    final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+    final SerializableConfiguration serConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
     jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
       // Only take latest version files <= latestCommit.
-      FileSystem fs1 = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
       List<Tuple2<String, String>> filePaths = new ArrayList<>();
-      dataFiles.forEach(hoodieDataFile -> filePaths.add(new 
Tuple2<>(partition, hoodieDataFile)));
+      Stream<HoodieBaseFile> dataFiles = 
fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp);
+      dataFiles.forEach(hoodieDataFile -> filePaths.add(new 
Tuple2<>(partition, hoodieDataFile.getPath())));
 
       // also need to copy over partition metadata
       Path partitionMetaFile =
           new Path(new Path(cfg.sourceBasePath, partition), 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
-      if (fs1.exists(partitionMetaFile)) {
+      FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+      if (fs.exists(partitionMetaFile)) {
         filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
       }
 
@@ -199,19 +211,20 @@ public class HoodieSnapshotExporter {
       String partition = tuple._1();
       Path sourceFilePath = new Path(tuple._2());
       Path toPartitionPath = new Path(cfg.targetOutputPath, partition);
-      FileSystem ifs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
+      FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
 
-      if (!ifs.exists(toPartitionPath)) {
-        ifs.mkdirs(toPartitionPath);
+      if (!fs.exists(toPartitionPath)) {
+        fs.mkdirs(toPartitionPath);
       }
-      FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, 
sourceFilePath.getName()), false,
-          ifs.getConf());
+      FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, 
sourceFilePath.getName()), false,
+          fs.getConf());
     });
 
     // Also copy the .commit files
     LOG.info(String.format("Copying .commit files which are no-late-than %s.", 
latestCommitTimestamp));
+    final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, 
jsc.hadoopConfiguration());
     FileStatus[] commitFilesToCopy =
-        fs.listStatus(new Path(cfg.sourceBasePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
+        fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
           if 
(commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
             return true;
           } else {
@@ -223,39 +236,37 @@ public class HoodieSnapshotExporter {
     for (FileStatus commitStatus : commitFilesToCopy) {
       Path targetFilePath =
           new Path(cfg.targetOutputPath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
-      if (!fs.exists(targetFilePath.getParent())) {
-        fs.mkdirs(targetFilePath.getParent());
+      if (!fileSystem.exists(targetFilePath.getParent())) {
+        fileSystem.mkdirs(targetFilePath.getParent());
       }
-      if (fs.exists(targetFilePath)) {
+      if (fileSystem.exists(targetFilePath)) {
         LOG.error(
             String.format("The target output commit file (%s targetBasePath) 
already exists.", targetFilePath));
       }
-      FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, 
fs.getConf());
+      FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, 
targetFilePath, false, fileSystem.getConf());
     }
   }
 
-  private void createSuccessTag(FileSystem fs, String targetOutputPath) throws 
IOException {
-    Path successTagPath = new Path(targetOutputPath + "/_SUCCESS");
-    if (!fs.exists(successTagPath)) {
-      LOG.info(String.format("Creating _SUCCESS under target output path: %s", 
targetOutputPath));
-      fs.createNewFile(successTagPath);
-    }
+  private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config 
cfg) {
+    FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, 
jsc.hadoopConfiguration());
+    HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+    return new HoodieTableFileSystemView(tableMetadata, tableMetadata
+        .getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
   }
 
   public static void main(String[] args) throws IOException {
-    // Take input configs
     final Config cfg = new Config();
     new JCommander(cfg, null, args);
 
-    // Create a spark job to do the snapshot export
-    SparkSession spark = 
SparkSession.builder().appName("Hoodie-snapshot-exporter")
-        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer").getOrCreate();
+    SparkConf sparkConf = new 
SparkConf().setAppName("Hoodie-snapshot-exporter");
+    sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
     LOG.info("Initializing spark job.");
 
-    HoodieSnapshotExporter hoodieSnapshotExporter = new 
HoodieSnapshotExporter();
-    hoodieSnapshotExporter.export(spark, cfg);
-
-    // Stop the job
-    spark.stop();
+    try {
+      new HoodieSnapshotExporter().export(jsc, cfg);
+    } finally {
+      jsc.stop();
+    }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSnapshotExporterException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSnapshotExporterException.java
new file mode 100644
index 0000000..6fcb9df
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSnapshotExporterException.java
@@ -0,0 +1,10 @@
+package org.apache.hudi.utilities.exception;
+
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieSnapshotExporterException extends HoodieException {
+
+  public HoodieSnapshotExporterException(String msg) {
+    super(msg);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index 6eb15a2..730b9ec 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -31,8 +31,10 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
 import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
 import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
+import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
 
 import com.beust.jcommander.ParameterException;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -43,11 +45,12 @@ import org.apache.spark.sql.Column;
 import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -56,9 +59,9 @@ import org.junit.runners.Parameterized.Parameters;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -85,7 +88,6 @@ public class TestHoodieSnapshotExporter {
       sourcePath = dfsBasePath + "/source/";
       targetPath = dfsBasePath + "/target/";
       dfs.mkdirs(new Path(sourcePath));
-      dfs.mkdirs(new Path(targetPath));
       HoodieTableMetaClient
           .initTableType(jsc.hadoopConfiguration(), sourcePath, 
HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
               HoodieAvroPayload.class.getName());
@@ -140,7 +142,7 @@ public class TestHoodieSnapshotExporter {
 
     @Test
     public void testExportAsHudi() throws IOException {
-      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+      new HoodieSnapshotExporter().export(jsc, cfg);
 
       // Check results
       assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean")));
@@ -159,18 +161,61 @@ public class TestHoodieSnapshotExporter {
       assertTrue(dfs.exists(new Path(partition + 
"/.hoodie_partition_metadata")));
       assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
     }
+  }
+
+  public static class TestHoodieSnapshotExporterForEarlyAbort extends 
ExporterTestHarness {
+
+    private HoodieSnapshotExporter.Config cfg;
+
+    @Rule
+    public ExpectedException exceptionRule = ExpectedException.none();
+
+    @Before
+    public void setUp() throws Exception {
+      super.setUp();
+      cfg = new Config();
+      cfg.sourceBasePath = sourcePath;
+      cfg.targetOutputPath = targetPath;
+      cfg.outputFormat = OutputFormatValidator.HUDI;
+    }
 
     @Test
-    public void testExportEmptyDataset() throws IOException {
+    public void testExportWhenTargetPathExists() throws IOException {
+      // make target output path present
+      dfs.mkdirs(new Path(targetPath));
+
+      // export
+      exceptionRule.expect(HoodieSnapshotExporterException.class);
+      exceptionRule.expectMessage("The target output path already exists.");
+      new HoodieSnapshotExporter().export(jsc, cfg);
+    }
+
+    @Test
+    public void testExportDatasetWithNoCommit() throws IOException {
+      // delete commit files
+      List<Path> commitFiles = Arrays.stream(dfs.listStatus(new 
Path(sourcePath + "/.hoodie")))
+          .map(FileStatus::getPath)
+          .filter(filePath -> filePath.getName().endsWith(".commit"))
+          .collect(Collectors.toList());
+      for (Path p : commitFiles) {
+        dfs.delete(p, false);
+      }
+
+      // export
+      exceptionRule.expect(HoodieSnapshotExporterException.class);
+      exceptionRule.expectMessage("No commits present. Nothing to snapshot.");
+      new HoodieSnapshotExporter().export(jsc, cfg);
+    }
+
+    @Test
+    public void testExportDatasetWithNoPartition() throws IOException {
       // delete all source data
       dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
 
       // export
-      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
-
-      // Check results
-      assertEquals("Target path should be empty.", 0, dfs.listStatus(new 
Path(targetPath)).length);
-      assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+      exceptionRule.expect(HoodieSnapshotExporterException.class);
+      exceptionRule.expectMessage("The source dataset has 0 partition to 
snapshot.");
+      new HoodieSnapshotExporter().export(jsc, cfg);
     }
   }
 
@@ -191,7 +236,7 @@ public class TestHoodieSnapshotExporter {
       cfg.sourceBasePath = sourcePath;
       cfg.targetOutputPath = targetPath;
       cfg.outputFormat = format;
-      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+      new HoodieSnapshotExporter().export(jsc, cfg);
       assertEquals(NUM_RECORDS, 
sqlContext.read().format(format).load(targetPath).count());
       assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
     }
@@ -228,7 +273,7 @@ public class TestHoodieSnapshotExporter {
     public void testExportWithPartitionField() throws IOException {
       // `driver` field is set in HoodieTestDataGenerator
       cfg.outputPartitionField = "driver";
-      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+      new HoodieSnapshotExporter().export(jsc, cfg);
 
       assertEquals(NUM_RECORDS, 
sqlContext.read().format("json").load(targetPath).count());
       assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
@@ -238,7 +283,7 @@ public class TestHoodieSnapshotExporter {
     @Test
     public void testExportForUserDefinedPartitioner() throws IOException {
       cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
-      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+      new HoodieSnapshotExporter().export(jsc, cfg);
 
       assertEquals(NUM_RECORDS, 
sqlContext.read().format("json").load(targetPath).count());
       assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));

Reply via email to