This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dd96129  [HUDI-2990] Sync to HMS when deleting partitions (#4291)
dd96129 is described below

commit dd961291914df656418c9a1cacd39eaf8a806809
Author: ForwardXu <forwardxu...@gmail.com>
AuthorDate: Mon Dec 13 20:40:06 2021 +0800

    [HUDI-2990] Sync to HMS when deleting partitions (#4291)
---
 .../hudi/common/table/TableSchemaResolver.java     | 19 +++++++
 .../AlterHoodieTableDropPartitionCommand.scala     | 24 ++++++--
 .../java/org/apache/hudi/dla/HoodieDLAClient.java  |  5 ++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 64 ++++++++++++++--------
 .../org/apache/hudi/hive/HoodieHiveClient.java     | 33 +++++++++--
 .../java/org/apache/hudi/hive/ddl/DDLExecutor.java |  8 +++
 .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java   | 19 +++++++
 .../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 19 +++++++
 .../org/apache/hudi/hive/ddl/JDBCExecutor.java     |  8 +++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 50 +++++++++++++++++
 .../hudi/sync/common/AbstractSyncHoodieClient.java | 31 ++++++++++-
 11 files changed, 244 insertions(+), 36 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 51e3e27..d1a4d96 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -414,6 +414,25 @@ public class TableSchemaResolver {
     return latestSchema;
   }
 
+
+  /**
+   * Get Last commit's Metadata.
+   */
+  public Option<HoodieCommitMetadata> getLatestCommitMetadata() {
+    try {
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      if (timeline.lastInstant().isPresent()) {
+        HoodieInstant instant = timeline.lastInstant().get();
+        byte[] data = timeline.getInstantDetails(instant).get();
+        return Option.of(HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class));
+      } else {
+        return Option.empty();
+      }
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get commit metadata", e);
+    }
+  }
+
   /**
    * Read the parquet schema from a parquet File.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
index a3dfdd6..1c295fb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
@@ -17,18 +17,19 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
-
-import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
 import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
 
 case class AlterHoodieTableDropPartitionCommand(
     tableIdentifier: TableIdentifier,
@@ -67,7 +68,8 @@ extends RunnableCommand {
     val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
     val enableHiveStylePartitioning = 
isHiveStyledPartitioning(allPartitionPaths, table)
     val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
-    val partitionsToDelete = normalizedSpecs.map { spec =>
+    val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+    val partitionsToDrop = normalizedSpecs.map { spec =>
       hoodieCatalogTable.partitionFields.map{ partitionColumn =>
         val encodedPartitionValue = if (enableEncodeUrl) {
           PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
@@ -82,16 +84,26 @@ extends RunnableCommand {
       }.mkString("/")
     }.mkString(",")
 
+    val enableHive = isEnableHive(sparkSession)
     withSparkConf(sparkSession, Map.empty) {
       Map(
         "path" -> hoodieCatalogTable.tableLocation,
         TBL_NAME.key -> hoodieCatalogTable.tableName,
         TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
         OPERATION.key -> 
DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
-        PARTITIONS_TO_DELETE.key -> partitionsToDelete,
+        PARTITIONS_TO_DELETE.key -> partitionsToDrop,
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
         PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
-        PARTITIONPATH_FIELD.key -> 
hoodieCatalogTable.partitionFields.mkString(",")
+        PARTITIONPATH_FIELD.key -> partitionFields,
+        HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HIVE_USE_JDBC.key -> "false",
+        HIVE_DATABASE.key -> 
hoodieCatalogTable.table.identifier.database.getOrElse("default"),
+        HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
+        HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HIVE_PARTITION_FIELDS.key -> partitionFields,
+        HIVE_PARTITION_EXTRACTOR_CLASS.key -> 
classOf[MultiPartKeysValueExtractor].getCanonicalName
       )
     }
   }
diff --git 
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
 
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index 20f94f0..77d7362 100644
--- 
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++ 
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -287,6 +287,11 @@ public class HoodieDLAClient extends 
AbstractSyncHoodieClient {
     }
   }
 
+  @Override
+  public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop) {
+    throw new UnsupportedOperationException("Not support dropPartitionsToTable 
yet.");
+  }
+
   public Map<List<String>, String> scanTablePartitions(String tableName) {
     String sql = constructShowPartitionSQL(tableName);
     Statement stmt = null;
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 3bbaee1..f07ab88 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -28,7 +28,6 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hive.util.ConfigUtils;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
 import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
-
 import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
 import 
org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
 import org.apache.hudi.sync.common.AbstractSyncTool;
@@ -166,20 +165,28 @@ public class HiveSyncTool extends AbstractSyncTool {
     // Check if the necessary table exists
     boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
 
-    // Get the parquet schema for this table looking at the latest commit
-    MessageType schema = hoodieHiveClient.getDataSchema();
-
-    // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt 
table,
-    // so we disable the syncAsSparkDataSourceTable here to avoid read such 
kind table
-    // by the data source way (which will use the HoodieBootstrapRelation).
-    // TODO after we support bootstrap MOR rt table in 
HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
-    if (hoodieHiveClient.isBootstrap()
-            && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
-            && !readAsOptimized) {
-      cfg.syncAsSparkDataSourceTable = false;
+    // check if isDropPartition
+    boolean isDropPartition = hoodieHiveClient.isDropPartition();
+
+    // check if schemaChanged
+    boolean schemaChanged = false;
+
+    if (!isDropPartition) {
+      // Get the parquet schema for this table looking at the latest commit
+      MessageType schema = hoodieHiveClient.getDataSchema();
+
+      // Currently HoodieBootstrapRelation does support reading bootstrap MOR 
rt table,
+      // so we disable the syncAsSparkDataSourceTable here to avoid read such 
kind table
+      // by the data source way (which will use the HoodieBootstrapRelation).
+      // TODO after we support bootstrap MOR rt table in 
HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
+      if (hoodieHiveClient.isBootstrap()
+          && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
+          && !readAsOptimized) {
+        cfg.syncAsSparkDataSourceTable = false;
+      }
+      // Sync schema if needed
+      schemaChanged = syncSchema(tableName, tableExists, 
useRealtimeInputFormat, readAsOptimized, schema);
     }
-    // Sync schema if needed
-    boolean schemaChanged = syncSchema(tableName, tableExists, 
useRealtimeInputFormat, readAsOptimized, schema);
 
     LOG.info("Schema sync complete. Syncing partitions for " + tableName);
     // Get the last time we successfully synced partitions
@@ -192,7 +199,7 @@ public class HiveSyncTool extends AbstractSyncTool {
     LOG.info("Storage partitions scan complete. Found " + 
writtenPartitionsSince.size());
 
     // Sync the partitions if needed
-    boolean partitionsChanged = syncPartitions(tableName, 
writtenPartitionsSince);
+    boolean partitionsChanged = syncPartitions(tableName, 
writtenPartitionsSince, isDropPartition);
     boolean meetSyncConditions = schemaChanged || partitionsChanged;
     if (!cfg.isConditionalSync || meetSyncConditions) {
       hoodieHiveClient.updateLastCommitTimeSynced(tableName);
@@ -331,19 +338,32 @@ public class HiveSyncTool extends AbstractSyncTool {
    * Syncs the list of storage partitions passed in (checks if the partition 
is in hive, if not adds it or if the
    * partition path does not match, it updates the partition path).
    */
-  private boolean syncPartitions(String tableName, List<String> 
writtenPartitionsSince) {
+  private boolean syncPartitions(String tableName, List<String> 
writtenPartitionsSince, boolean isDropPartition) {
     boolean partitionsChanged;
     try {
       List<Partition> hivePartitions = 
hoodieHiveClient.scanTablePartitions(tableName);
       List<PartitionEvent> partitionEvents =
-          hoodieHiveClient.getPartitionEvents(hivePartitions, 
writtenPartitionsSince);
+          hoodieHiveClient.getPartitionEvents(hivePartitions, 
writtenPartitionsSince, isDropPartition);
+
       List<String> newPartitions = filterPartitions(partitionEvents, 
PartitionEventType.ADD);
-      LOG.info("New Partitions " + newPartitions);
-      hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
+      if (!newPartitions.isEmpty()) {
+        LOG.info("New Partitions " + newPartitions);
+        hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
+      }
+
       List<String> updatePartitions = filterPartitions(partitionEvents, 
PartitionEventType.UPDATE);
-      LOG.info("Changed Partitions " + updatePartitions);
-      hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
-      partitionsChanged = !updatePartitions.isEmpty() || 
!newPartitions.isEmpty();
+      if (!updatePartitions.isEmpty()) {
+        LOG.info("Changed Partitions " + updatePartitions);
+        hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
+      }
+
+      List<String> dropPartitions = filterPartitions(partitionEvents, 
PartitionEventType.DROP);
+      if (!dropPartitions.isEmpty()) {
+        LOG.info("Drop Partitions " + dropPartitions);
+        hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions);
+      }
+
+      partitionsChanged = !updatePartitions.isEmpty() || 
!newPartitions.isEmpty() || !dropPartitions.isEmpty();
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to sync partitions for table " 
+ tableName, e);
     }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 265ab75..287de57 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -123,6 +123,14 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
   }
 
   /**
+   * Partition path has changed - drop the following partitions.
+   */
+  @Override
+  public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop) {
+    ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop);
+  }
+
+  /**
    * Update the table properties to the table.
    */
   @Override
@@ -147,6 +155,14 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
    * Generate a list of PartitionEvent based on the changes required.
    */
   List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, 
List<String> partitionStoragePartitions) {
+    return getPartitionEvents(tablePartitions, partitionStoragePartitions, 
false);
+  }
+
+  /**
+   * Iterate over the storage partitions and find if there are any new 
partitions that need to be added or updated.
+   * Generate a list of PartitionEvent based on the changes required.
+   */
+  List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, 
List<String> partitionStoragePartitions, boolean isDropPartition) {
     Map<String, String> paths = new HashMap<>();
     for (Partition tablePartition : tablePartitions) {
       List<String> hivePartitionValues = tablePartition.getValues();
@@ -161,12 +177,17 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
       String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
-      if (!storagePartitionValues.isEmpty()) {
-        String storageValue = String.join(", ", storagePartitionValues);
-        if (!paths.containsKey(storageValue)) {
-          events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
-        } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
-          events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+
+      if (isDropPartition) {
+        events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+      } else {
+        if (!storagePartitionValues.isEmpty()) {
+          String storageValue = String.join(", ", storagePartitionValues);
+          if (!paths.containsKey(storageValue)) {
+            events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+          } else if 
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+            
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+          }
         }
       }
     }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
index 0e1e223..dc37d92 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
@@ -81,5 +81,13 @@ public interface DDLExecutor {
    */
   public void updatePartitionsToTable(String tableName, List<String> 
changedPartitions);
 
+  /**
+   * Drop partitions for a given table.
+   *
+   * @param tableName
+   * @param partitionsToDrop
+   */
+  public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop);
+
   public void close();
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index 37aa54a..d3efebe 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -227,6 +227,25 @@ public class HMSDDLExecutor implements DDLExecutor {
   }
 
   @Override
+  public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop) {
+    if (partitionsToDrop.isEmpty()) {
+      LOG.info("No partitions to drop for " + tableName);
+      return;
+    }
+
+    LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + 
tableName);
+    try {
+      for (String dropPartition : partitionsToDrop) {
+        client.dropPartition(syncConfig.databaseName, tableName, 
dropPartition, false);
+        LOG.info("Drop partition " + dropPartition + " on " + tableName);
+      }
+    } catch (TException e) {
+      LOG.error(syncConfig.databaseName + "." + tableName + " drop partition 
failed", e);
+      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + 
tableName + " drop partition failed", e);
+    }
+  }
+
+  @Override
   public void close() {
     if (client != null) {
       Hive.closeCurrent();
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index e2635ee..7161194 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -127,6 +127,25 @@ public class HiveQueryDDLExecutor extends 
QueryBasedDDLExecutor {
   }
 
   @Override
+  public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop) {
+    if (partitionsToDrop.isEmpty()) {
+      LOG.info("No partitions to drop for " + tableName);
+      return;
+    }
+
+    LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + 
tableName);
+    try {
+      for (String dropPartition : partitionsToDrop) {
+        metaStoreClient.dropPartition(config.databaseName, tableName, 
dropPartition, false);
+        LOG.info("Drop partition " + dropPartition + " on " + tableName);
+      }
+    } catch (Exception e) {
+      LOG.error(config.databaseName + "." + tableName + " drop partition 
failed", e);
+      throw new HoodieHiveSyncException(config.databaseName + "." + tableName 
+ " drop partition failed", e);
+    }
+  }
+
+  @Override
   public void close() {
     if (metaStoreClient != null) {
       Hive.closeCurrent();
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 1603191..493d4ee 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -32,6 +32,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -142,6 +143,13 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
   }
 
   @Override
+  public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop) {
+    partitionsToDrop.stream()
+        .map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION 
(%s)", tableName, partition))
+        .forEach(this::runSQL);
+  }
+
+  @Override
   public void close() {
     try {
       if (connection != null) {
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index d36727a..ef98641 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -739,6 +739,56 @@ public class TestHiveSyncTool {
 
   @ParameterizedTest
   @MethodSource("syncMode")
+  public void testDropPartitionKeySync(String syncMode) throws Exception {
+    hiveSyncConfig.syncMode = syncMode;
+    HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
+    String instantTime = "100";
+    HiveTestUtil.createCOWTable(instantTime, 1, true);
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), 
fileSystem);
+    assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
+        "Table " + hiveSyncConfig.tableName + " should not exist initially");
+    // Lets do the sync
+    HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, 
HiveTestUtil.getHiveConf(), fileSystem);
+    tool.syncHoodieTable();
+    // we need renew the hiveclient after tool.syncHoodieTable(), because it 
will close hive
+    // session, then lead to connection retry, we can see there is a exception 
at log.
+    hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist 
after sync completes");
+    
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+        hiveClient.getDataSchema().getColumns().size() + 1,
+        "Hive Schema should match the table schema + partition field");
+    assertEquals(1, 
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+        "Table partitions should match the number of partitions we wrote");
+    assertEquals(instantTime, 
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+        "The last commit that was synced should be updated in the 
TBLPROPERTIES");
+
+    // Adding of new partitions
+    List<String> newPartition = Arrays.asList("2050/01/01");
+    hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList());
+    assertEquals(1, 
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+        "No new partition should be added");
+    hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition);
+    assertEquals(2, 
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+        "New partition should be added");
+
+    tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), 
fileSystem);
+    tool.syncHoodieTable();
+
+    // Drop 1 partition.
+    ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName
+        + "` DROP PARTITION (`datestr`='2050-01-01')");
+
+    hiveClient = new HoodieHiveClient(hiveSyncConfig, 
HiveTestUtil.getHiveConf(), fileSystem);
+    List<Partition> hivePartitions = 
hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
+    assertEquals(1, hivePartitions.size(),
+        "Table should have 1 partition because of the drop 1 partition");
+  }
+
+  @ParameterizedTest
+  @MethodSource("syncMode")
   public void testNonPartitionedSync(String syncMode) throws Exception {
 
     hiveSyncConfig.syncMode = syncMode;
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index ce4720a..0a277be 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -20,16 +20,18 @@ package org.apache.hudi.sync.common;
 
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -98,6 +100,8 @@ public abstract class AbstractSyncHoodieClient {
 
   public abstract void updatePartitionsToTable(String tableName, List<String> 
changedPartitions);
 
+  public abstract void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop);
+
   public  void updateTableProperties(String tableName, Map<String, String> 
tableProperties) {}
 
   public abstract Map<String, String> getTableSchema(String tableName);
@@ -155,6 +159,25 @@ public abstract class AbstractSyncHoodieClient {
     }
   }
 
+  public boolean isDropPartition() {
+    try {
+      Option<HoodieCommitMetadata> hoodieCommitMetadata;
+      if (withOperationField) {
+        hoodieCommitMetadata = new TableSchemaResolver(metaClient, 
true).getLatestCommitMetadata();
+      } else {
+        hoodieCommitMetadata = new 
TableSchemaResolver(metaClient).getLatestCommitMetadata();
+      }
+
+      if (hoodieCommitMetadata.isPresent()
+          && 
hoodieCommitMetadata.get().getOperationType().equals(WriteOperationType.DELETE_PARTITION))
 {
+        return true;
+      }
+    } catch (Exception e) {
+      throw new HoodieSyncException("Failed to read data schema", e);
+    }
+    return false;
+  }
+
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
   public List<String> getPartitionsWrittenToSince(Option<String> 
lastCommitTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
@@ -224,7 +247,7 @@ public abstract class AbstractSyncHoodieClient {
   public static class PartitionEvent {
 
     public enum PartitionEventType {
-      ADD, UPDATE
+      ADD, UPDATE, DROP
     }
 
     public PartitionEventType eventType;
@@ -242,5 +265,9 @@ public abstract class AbstractSyncHoodieClient {
     public static PartitionEvent newPartitionUpdateEvent(String 
storagePartition) {
       return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
     }
+
+    public static PartitionEvent newPartitionDropEvent(String 
storagePartition) {
+      return new PartitionEvent(PartitionEventType.DROP, storagePartition);
+    }
   }
 }

Reply via email to