This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e177466  [HUDI-1350] Support Partition level delete API in HUDI (#2254)
e177466 is described below

commit e177466fd266ebf3a8d371ce1bf2ecf3bdfe90ed
Author: lw0090 <lw309637...@gmail.com>
AuthorDate: Tue Dec 29 07:01:06 2020 +0800

    [HUDI-1350] Support Partition level delete API in HUDI (#2254)
    
    * [HUDI-1350] Support Partition level delete API in HUDI
    
    * [HUDI-1350] Support Partition level delete API in HUDI base 
InsertOverwriteCommitAction
    
    * [HUDI-1350] Support Partition level delete API in HUDI base 
InsertOverwriteCommitAction
---
 .../java/org/apache/hudi/table/HoodieTable.java    |   9 ++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   5 +
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   5 +
 .../apache/hudi/client/SparkRDDWriteClient.java    |   7 ++
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   6 ++
 .../SparkDeletePartitionCommitActionExecutor.java  |  68 +++++++++++++
 .../SparkInsertOverwriteCommitActionExecutor.java  |   2 +-
 ...rkInsertOverwriteTableCommitActionExecutor.java |   6 --
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 105 ++++++++++++++++++++-
 .../hudi/testutils/HoodieClientTestUtils.java      |  27 ++++--
 .../hudi/common/model/WriteOperationType.java      |   4 +
 11 files changed, 228 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 36cd89a..6b7a7d2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -159,6 +159,15 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
   public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, 
String instantTime, K keys);
 
   /**
+   * Deletes all data of partitions.
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param partitions   {@link List} of partition to be deleted
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext 
context, String instantTime, List<String> partitions);
+
+  /**
    * Upserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
    * <p>
    * This implementation requires that the input records are already tagged, 
and de-duped if needed.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 3c4d7fb..d0cb8de 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -85,6 +85,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
+  public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, 
String instantTime, List<String> partitions) {
+    throw new HoodieNotSupportedException("DeletePartitions is not supported 
yet");
+  }
+
+  @Override
   public HoodieWriteMetadata<List<WriteStatus>> 
upsertPrepped(HoodieEngineContext context, String instantTime, 
List<HoodieRecord<T>> preppedRecords) {
     return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, 
instantTime, preppedRecords).execute();
   }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 7f65889..ddc995a 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -85,6 +85,11 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
   }
 
   @Override
+  public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, 
String instantTime, List<String> partitions) {
+    throw new HoodieNotSupportedException("Delete partitions is not supported 
yet");
+  }
+
+  @Override
   public HoodieWriteMetadata<List<WriteStatus>> 
upsertPrepped(HoodieEngineContext context,
                                                               String 
instantTime,
                                                               
List<HoodieRecord<T>> preppedRecords) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 18f5309..f7e7690 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -245,6 +245,13 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
     return postWrite(result, instantTime, table);
   }
 
+  public HoodieWriteResult deletePartitions(List<String> partitions, String 
instantTime) {
+    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> table = 
getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime);
+    setOperationType(WriteOperationType.DELETE_PARTITION);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = 
table.deletePartitions(context,instantTime, partitions);
+    return new HoodieWriteResult(postWrite(result, instantTime, table), 
result.getPartitionToReplaceFileIds());
+  }
+
   @Override
   protected JavaRDD<WriteStatus> 
postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus>> result,
                                            String instantTime,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 71085a2..357b5ce 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -50,6 +50,7 @@ import 
org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor;
 import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.SparkDeletePartitionCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
@@ -109,6 +110,11 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
+  public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, 
String instantTime, List<String> partitions) {
+    return new SparkDeletePartitionCommitActionExecutor(context, config, this, 
instantTime, partitions).execute();
+  }
+
+  @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
upsertPrepped(HoodieEngineContext context, String instantTime,
       JavaRDD<HoodieRecord<T>> preppedRecords) {
     return new 
SparkUpsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, 
config, this, instantTime, preppedRecords).execute();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
new file mode 100644
index 0000000..ea1ef51
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SparkDeletePartitionCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends SparkInsertOverwriteCommitActionExecutor<T> {
+
+  private List<String> partitions;
+  public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteConfig config, 
HoodieTable table,
+                                                  String instantTime, 
List<String> partitions) {
+    super(context, config, table, instantTime,null, 
WriteOperationType.DELETE_PARTITION);
+    this.partitions = partitions;
+  }
+
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
+        .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+    result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+
+    result.setWriteStatuses(jsc.emptyRDD());
+    this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new 
HashMap<>(), new WorkloadStat())), instantTime);
+    this.commitOnAutoCommit(result);
+    return result;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 1e38220..c5d3c76 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -77,7 +77,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T 
extends HoodieRecordPayl
         new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
   }
 
-  private List<String> getAllExistingFileIds(String partitionPath) {
+  protected List<String> getAllExistingFileIds(String partitionPath) {
     // because new commit is not complete. it is safe to mark all existing 
file Ids as old files
     return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> 
fg.getFileId()).distinct().collect(Collectors.toList());
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
index e349657..c014515 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class SparkInsertOverwriteTableCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends SparkInsertOverwriteCommitActionExecutor<T> {
@@ -47,11 +46,6 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T 
extends HoodieRecor
     super(context, config, table, instantTime, inputRecordsRDD, 
WriteOperationType.INSERT_OVERWRITE_TABLE);
   }
 
-  protected List<String> getAllExistingFileIds(String partitionPath) {
-    return table.getSliceView().getLatestFileSlices(partitionPath)
-        .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
-  }
-
   @Override
   protected Map<String, List<String>> 
getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
     Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index c201efd..e86cb2d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1103,7 +1103,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
    * Test scenario of writing similar number file groups in partition.
    */
   @Test
-  public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() 
throws Exception {
+  public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() 
throws Exception {
     verifyInsertOverwritePartitionHandling(3000, 3000);
   }
 
@@ -1144,6 +1144,109 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   }
 
   /**
+   * Test scenario of writing fewer file groups for first partition than 
second an third partition.
+   */
+  @Test
+  public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() 
throws Exception {
+    verifyDeletePartitionsHandling(1000, 3000, 3000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() 
throws Exception {
+    verifyDeletePartitionsHandling(3000, 3000, 3000);
+  }
+
+  /**
+   * Test scenario of writing more file groups for first partition than second 
an third partition.
+   */
+  @Test
+  public void 
verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() 
throws Exception {
+    verifyDeletePartitionsHandling(3000, 1000, 1000);
+  }
+
+  private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient 
client, int recordsCount, String commitTime1, String partitionPath) {
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = 
dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> batchBuckets = statuses.stream().map(s -> 
s.getFileId()).collect(Collectors.toSet());
+    verifyRecordsWritten(commitTime1, inserts1, statuses);
+    return batchBuckets;
+  }
+
+  private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, 
String commitTime, List<String> deletePartitionPath) {
+    client.startCommitWithTime(commitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+    HoodieWriteResult writeResult = 
client.deletePartitions(deletePartitionPath, commitTime);
+    Set<String> deletePartitionReplaceFileIds =
+        writeResult.getPartitionToReplaceFileIds().entrySet()
+            .stream().flatMap(entry -> 
entry.getValue().stream()).collect(Collectors.toSet());
+    return deletePartitionReplaceFileIds;
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records for 
first partition.
+   *  2) Do write2 (upsert) with 'batch2RecordsCount' number of records for 
second partition.
+   *  3) Do write3 (upsert) with 'batch3RecordsCount' number of records for 
third partition.
+   *  4) delete first partition and check result.
+   *  5) delete second and third partition and check result.
+   *
+   */
+  private void verifyDeletePartitionsHandling(int batch1RecordsCount, int 
batch2RecordsCount, int batch3RecordsCount) throws Exception {
+    HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
+    SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+    dataGen = new HoodieTestDataGenerator();
+
+    // Do Inserts for DEFAULT_FIRST_PARTITION_PATH
+    String commitTime1 = "001";
+    Set<String> batch1Buckets =
+        this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, 
commitTime1, DEFAULT_FIRST_PARTITION_PATH);
+
+    // Do Inserts for DEFAULT_SECOND_PARTITION_PATH
+    String commitTime2 = "002";
+    Set<String> batch2Buckets =
+        this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, 
commitTime2, DEFAULT_SECOND_PARTITION_PATH);
+
+    // Do Inserts for DEFAULT_THIRD_PARTITION_PATH
+    String commitTime3 = "003";
+    Set<String> batch3Buckets =
+        this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, 
commitTime3, DEFAULT_THIRD_PARTITION_PATH);
+
+    // delete DEFAULT_FIRST_PARTITION_PATH
+    String commitTime4 = "004";
+    Set<String> deletePartitionReplaceFileIds1 =
+        deletePartitionWithCommit(client, commitTime4, 
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
+    assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
+    List<HoodieBaseFile> baseFiles = 
HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+        String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
+    assertEquals(0, baseFiles.size());
+    baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+        String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
+    assertTrue(baseFiles.size() > 0);
+    baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+        String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
+    assertTrue(baseFiles.size() > 0);
+
+    // delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
+    String commitTime5 = "005";
+    Set<String> deletePartitionReplaceFileIds2 =
+        deletePartitionWithCommit(client, commitTime5, 
Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH));
+    Set<String> expectedFileId = new HashSet<>();
+    expectedFileId.addAll(batch2Buckets);
+    expectedFileId.addAll(batch3Buckets);
+    assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
+
+    baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+        String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
+        String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
+        String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
+    assertEquals(0, baseFiles.size());
+  }
+
+  /**
    * Verify data in parquet files matches expected records and commit time.
    */
   private void verifyRecordsWritten(String commitTime, List<HoodieRecord> 
expectedRecords, List<WriteStatus> allStatus) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 307e068..c91b51b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -147,6 +147,22 @@ public class HoodieClientTestUtils {
     }
   }
 
+  public static List<HoodieBaseFile> getLatestBaseFiles(String basePath, 
FileSystem fs,
+                                                String... paths) {
+    List<HoodieBaseFile> latestFiles = new ArrayList<>();
+    try {
+      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(fs.getConf(), basePath, true);
+      for (String path : paths) {
+        BaseFileOnlyView fileSystemView = new 
HoodieTableFileSystemView(metaClient,
+            metaClient.getCommitsTimeline().filterCompletedInstants(), 
fs.globStatus(new Path(path)));
+        
latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList()));
+      }
+    } catch (Exception e) {
+      throw new HoodieException("Error reading hoodie table as a dataframe", 
e);
+    }
+    return latestFiles;
+  }
+
   /**
    * Reads the paths under the a hoodie table out as a DataFrame.
    */
@@ -154,14 +170,9 @@ public class HoodieClientTestUtils {
                                   String... paths) {
     List<String> filteredPaths = new ArrayList<>();
     try {
-      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(fs.getConf(), basePath, true);
-      for (String path : paths) {
-        BaseFileOnlyView fileSystemView = new 
HoodieTableFileSystemView(metaClient,
-            metaClient.getCommitsTimeline().filterCompletedInstants(), 
fs.globStatus(new Path(path)));
-        List<HoodieBaseFile> latestFiles = 
fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
-        for (HoodieBaseFile file : latestFiles) {
-          filteredPaths.add(file.getPath());
-        }
+      List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, fs, 
paths);
+      for (HoodieBaseFile file : latestFiles) {
+        filteredPaths.add(file.getPath());
       }
       return sqlContext.read().parquet(filteredPaths.toArray(new 
String[filteredPaths.size()]));
     } catch (Exception e) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 39f0f62..f237156 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -42,6 +42,8 @@ public enum WriteOperationType {
   INSERT_OVERWRITE("insert_overwrite"),
   // cluster
   CLUSTER("cluster"),
+  // delete partition
+  DELETE_PARTITION("delete_partition"),
   // insert overwrite with dynamic partitioning
   INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
   // used for old version
@@ -74,6 +76,8 @@ public enum WriteOperationType {
         return DELETE;
       case "insert_overwrite":
         return INSERT_OVERWRITE;
+      case "delete_partition":
+        return DELETE_PARTITION;
       case "insert_overwrite_table":
         return INSERT_OVERWRITE_TABLE;
       case "cluster":

Reply via email to