This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new dd96129 [HUDI-2990] Sync to HMS when deleting partitions (#4291) dd96129 is described below commit dd961291914df656418c9a1cacd39eaf8a806809 Author: ForwardXu <forwardxu...@gmail.com> AuthorDate: Mon Dec 13 20:40:06 2021 +0800 [HUDI-2990] Sync to HMS when deleting partitions (#4291) --- .../hudi/common/table/TableSchemaResolver.java | 19 +++++++ .../AlterHoodieTableDropPartitionCommand.scala | 24 ++++++-- .../java/org/apache/hudi/dla/HoodieDLAClient.java | 5 ++ .../java/org/apache/hudi/hive/HiveSyncTool.java | 64 ++++++++++++++-------- .../org/apache/hudi/hive/HoodieHiveClient.java | 33 +++++++++-- .../java/org/apache/hudi/hive/ddl/DDLExecutor.java | 8 +++ .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 19 +++++++ .../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 19 +++++++ .../org/apache/hudi/hive/ddl/JDBCExecutor.java | 8 +++ .../org/apache/hudi/hive/TestHiveSyncTool.java | 50 +++++++++++++++++ .../hudi/sync/common/AbstractSyncHoodieClient.java | 31 ++++++++++- 11 files changed, 244 insertions(+), 36 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 51e3e27..d1a4d96 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -414,6 +414,25 @@ public class TableSchemaResolver { return latestSchema; } + + /** + * Get Last commit's Metadata. + */ + public Option<HoodieCommitMetadata> getLatestCommitMetadata() { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + if (timeline.lastInstant().isPresent()) { + HoodieInstant instant = timeline.lastInstant().get(); + byte[] data = timeline.getInstantDetails(instant).get(); + return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); + } else { + return Option.empty(); + } + } catch (Exception e) { + throw new HoodieException("Failed to get commit metadata", e); + } + } + /** * Read the parquet schema from a parquet File. */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index a3dfdd6..1c295fb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -17,18 +17,19 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME - -import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} +import org.apache.hudi.hive.MultiPartKeysValueExtractor +import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} case class AlterHoodieTableDropPartitionCommand( tableIdentifier: TableIdentifier, @@ -67,7 +68,8 @@ extends RunnableCommand { val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) - val partitionsToDelete = normalizedSpecs.map { spec => + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") + val partitionsToDrop = normalizedSpecs.map { spec => hoodieCatalogTable.partitionFields.map{ partitionColumn => val encodedPartitionValue = if (enableEncodeUrl) { PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) @@ -82,16 +84,26 @@ extends RunnableCommand { }.mkString("/") }.mkString(",") + val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, Map.empty) { Map( "path" -> hoodieCatalogTable.tableLocation, TBL_NAME.key -> hoodieCatalogTable.tableName, TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, - PARTITIONS_TO_DELETE.key -> partitionsToDelete, + PARTITIONS_TO_DELETE.key -> partitionsToDrop, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), - PARTITIONPATH_FIELD.key -> hoodieCatalogTable.partitionFields.mkString(",") + PARTITIONPATH_FIELD.key -> partitionFields, + HIVE_SYNC_ENABLED.key -> enableHive.toString, + META_SYNC_ENABLED.key -> enableHive.toString, + HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), + HIVE_USE_JDBC.key -> "false", + HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"), + HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table, + HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", + HIVE_PARTITION_FIELDS.key -> partitionFields, + HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName ) } } diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 20f94f0..77d7362 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -287,6 +287,11 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient { } } + @Override + public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) { + throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); + } + public Map<List<String>, String> scanTablePartitions(String tableName) { String sql = constructShowPartitionSQL(tableName); Statement stmt = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 3bbaee1..f07ab88 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -28,7 +28,6 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.util.ConfigUtils; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; - import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncTool; @@ -166,20 +165,28 @@ public class HiveSyncTool extends AbstractSyncTool { // Check if the necessary table exists boolean tableExists = hoodieHiveClient.doesTableExist(tableName); - // Get the parquet schema for this table looking at the latest commit - MessageType schema = hoodieHiveClient.getDataSchema(); - - // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, - // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table - // by the data source way (which will use the HoodieBootstrapRelation). - // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. - if (hoodieHiveClient.isBootstrap() - && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ - && !readAsOptimized) { - cfg.syncAsSparkDataSourceTable = false; + // check if isDropPartition + boolean isDropPartition = hoodieHiveClient.isDropPartition(); + + // check if schemaChanged + boolean schemaChanged = false; + + if (!isDropPartition) { + // Get the parquet schema for this table looking at the latest commit + MessageType schema = hoodieHiveClient.getDataSchema(); + + // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, + // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table + // by the data source way (which will use the HoodieBootstrapRelation). + // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. + if (hoodieHiveClient.isBootstrap() + && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ + && !readAsOptimized) { + cfg.syncAsSparkDataSourceTable = false; + } + // Sync schema if needed + schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); } - // Sync schema if needed - boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); LOG.info("Schema sync complete. Syncing partitions for " + tableName); // Get the last time we successfully synced partitions @@ -192,7 +199,7 @@ public class HiveSyncTool extends AbstractSyncTool { LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!cfg.isConditionalSync || meetSyncConditions) { hoodieHiveClient.updateLastCommitTimeSynced(tableName); @@ -331,19 +338,32 @@ public class HiveSyncTool extends AbstractSyncTool { * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). */ - private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince) { + private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) { boolean partitionsChanged; try { List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName); List<PartitionEvent> partitionEvents = - hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition); + List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); - LOG.info("New Partitions " + newPartitions); - hoodieHiveClient.addPartitionsToTable(tableName, newPartitions); + if (!newPartitions.isEmpty()) { + LOG.info("New Partitions " + newPartitions); + hoodieHiveClient.addPartitionsToTable(tableName, newPartitions); + } + List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE); - LOG.info("Changed Partitions " + updatePartitions); - hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions); - partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty(); + if (!updatePartitions.isEmpty()) { + LOG.info("Changed Partitions " + updatePartitions); + hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions); + } + + List<String> dropPartitions = filterPartitions(partitionEvents, PartitionEventType.DROP); + if (!dropPartitions.isEmpty()) { + LOG.info("Drop Partitions " + dropPartitions); + hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions); + } + + partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty(); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 265ab75..287de57 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -123,6 +123,14 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } /** + * Partition path has changed - drop the following partitions. + */ + @Override + public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) { + ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop); + } + + /** * Update the table properties to the table. */ @Override @@ -147,6 +155,14 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { * Generate a list of PartitionEvent based on the changes required. */ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) { + return getPartitionEvents(tablePartitions, partitionStoragePartitions, false); + } + + /** + * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. + * Generate a list of PartitionEvent based on the changes required. + */ + List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) { Map<String, String> paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List<String> hivePartitionValues = tablePartition.getValues(); @@ -161,12 +177,17 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); // Check if the partition values or if hdfs path is the same List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (!storagePartitionValues.isEmpty()) { - String storageValue = String.join(", ", storagePartitionValues); - if (!paths.containsKey(storageValue)) { - events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); - } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { - events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + + if (isDropPartition) { + events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); + } else { + if (!storagePartitionValues.isEmpty()) { + String storageValue = String.join(", ", storagePartitionValues); + if (!paths.containsKey(storageValue)) { + events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); + } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { + events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + } } } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java index 0e1e223..dc37d92 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -81,5 +81,13 @@ public interface DDLExecutor { */ public void updatePartitionsToTable(String tableName, List<String> changedPartitions); + /** + * Drop partitions for a given table. + * + * @param tableName + * @param partitionsToDrop + */ + public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop); + public void close(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 37aa54a..d3efebe 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -227,6 +227,25 @@ public class HMSDDLExecutor implements DDLExecutor { } @Override + public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) { + if (partitionsToDrop.isEmpty()) { + LOG.info("No partitions to drop for " + tableName); + return; + } + + LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); + try { + for (String dropPartition : partitionsToDrop) { + client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false); + LOG.info("Drop partition " + dropPartition + " on " + tableName); + } + } catch (TException e) { + LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e); + throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e); + } + } + + @Override public void close() { if (client != null) { Hive.closeCurrent(); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index e2635ee..7161194 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -127,6 +127,25 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { } @Override + public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) { + if (partitionsToDrop.isEmpty()) { + LOG.info("No partitions to drop for " + tableName); + return; + } + + LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); + try { + for (String dropPartition : partitionsToDrop) { + metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false); + LOG.info("Drop partition " + dropPartition + " on " + tableName); + } + } catch (Exception e) { + LOG.error(config.databaseName + "." + tableName + " drop partition failed", e); + throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e); + } + } + + @Override public void close() { if (metaStoreClient != null) { Hive.closeCurrent(); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 1603191..493d4ee 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -32,6 +32,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -142,6 +143,13 @@ public class JDBCExecutor extends QueryBasedDDLExecutor { } @Override + public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) { + partitionsToDrop.stream() + .map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION (%s)", tableName, partition)) + .forEach(this::runSQL); + } + + @Override public void close() { try { if (connection != null) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index d36727a..ef98641 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -739,6 +739,56 @@ public class TestHiveSyncTool { @ParameterizedTest @MethodSource("syncMode") + public void testDropPartitionKeySync(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 1, true); + HoodieHiveClient hiveClient = + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + // we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive + // session, then lead to connection retry, we can see there is a exception at log. + hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 1, + "Hive Schema should match the table schema + partition field"); + assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); + + // Adding of new partitions + List<String> newPartition = Arrays.asList("2050/01/01"); + hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList()); + assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "No new partition should be added"); + hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition); + assertEquals(2, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "New partition should be added"); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + + // Drop 1 partition. + ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName + + "` DROP PARTITION (`datestr`='2050-01-01')"); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + assertEquals(1, hivePartitions.size(), + "Table should have 1 partition because of the drop 1 partition"); + } + + @ParameterizedTest + @MethodSource("syncMode") public void testNonPartitionedSync(String syncMode) throws Exception { hiveSyncConfig.syncMode = syncMode; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index ce4720a..0a277be 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -20,16 +20,18 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -98,6 +100,8 @@ public abstract class AbstractSyncHoodieClient { public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions); + public abstract void dropPartitionsToTable(String tableName, List<String> partitionsToDrop); + public void updateTableProperties(String tableName, Map<String, String> tableProperties) {} public abstract Map<String, String> getTableSchema(String tableName); @@ -155,6 +159,25 @@ public abstract class AbstractSyncHoodieClient { } } + public boolean isDropPartition() { + try { + Option<HoodieCommitMetadata> hoodieCommitMetadata; + if (withOperationField) { + hoodieCommitMetadata = new TableSchemaResolver(metaClient, true).getLatestCommitMetadata(); + } else { + hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); + } + + if (hoodieCommitMetadata.isPresent() + && hoodieCommitMetadata.get().getOperationType().equals(WriteOperationType.DELETE_PARTITION)) { + return true; + } + } catch (Exception e) { + throw new HoodieSyncException("Failed to read data schema", e); + } + return false; + } + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { @@ -224,7 +247,7 @@ public abstract class AbstractSyncHoodieClient { public static class PartitionEvent { public enum PartitionEventType { - ADD, UPDATE + ADD, UPDATE, DROP } public PartitionEventType eventType; @@ -242,5 +265,9 @@ public abstract class AbstractSyncHoodieClient { public static PartitionEvent newPartitionUpdateEvent(String storagePartition) { return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); } + + public static PartitionEvent newPartitionDropEvent(String storagePartition) { + return new PartitionEvent(PartitionEventType.DROP, storagePartition); + } } }