This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e11db3f050ca8f633dd359c9a810365dcbb238d4 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Thu Apr 4 08:10:49 2024 +0800 [feature](hive)support ExternalTransaction for writing exteral table (#32726) Issue #31442 Add `TransactionManager` and `Transaction`. ``` public interface Transaction { void commit() throws UserException; void rollback(); } public interface TransactionManager { long begin(); void commit(long id) throws UserException; void rollback(long id); Transaction getTransaction(long id); } ``` `TransactionManager` is used to manage all external transactions: The application layer should manage the entire transaction through this `TransactionManager`, like: ``` transactionManager.commit(); transactionManager.rollback(); ``` `Transaction` is an interface. You can implement this interface according to the specific content, such as `HMSTransaction` currently implemented, iceberg that may be implemented in the future, etc. --- .../apache/doris/datasource/ExternalCatalog.java | 2 + .../apache/doris/datasource/hive/HMSCommitter.java | 754 ----------- .../doris/datasource/hive/HMSExternalCatalog.java | 6 +- .../doris/datasource/hive/HMSTransaction.java | 1322 ++++++++++++++++++++ .../doris/datasource/hive/HiveMetadataOps.java | 24 +- .../datasource/hive/HivePartitionStatistics.java | 2 +- .../hive/HivePartitionWithStatistics.java | 6 +- .../plans/commands/insert/HiveInsertExecutor.java | 34 +- .../main/java/org/apache/doris/qe/Coordinator.java | 30 +- .../doris/transaction/HiveTransactionManager.java | 79 ++ .../Transaction.java} | 25 +- .../TransactionManager.java} | 40 +- .../TransactionManagerFactory.java} | 25 +- .../doris/datasource/hive/HmsCommitTest.java | 72 +- 14 files changed, 1535 insertions(+), 886 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 737705bd8b5..a3525321edf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -50,6 +50,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.MasterCatalogExecutor; +import org.apache.doris.transaction.TransactionManager; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -111,6 +112,7 @@ public abstract class ExternalCatalog private boolean objectCreated = false; protected boolean invalidCacheInInit = true; protected ExternalMetadataOps metadataOps; + protected TransactionManager transactionManager; private ExternalSchemaCache schemaCache; private String comment; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java deleted file mode 100644 index af26f36d6b9..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java +++ /dev/null @@ -1,754 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java -// and modified by Doris - -package org.apache.doris.datasource.hive; - -import org.apache.doris.backup.Status; -import org.apache.doris.common.Pair; -import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.RemoteFileSystem; -import org.apache.doris.thrift.THivePartitionUpdate; -import org.apache.doris.thrift.TUpdateMode; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import io.airlift.concurrent.MoreFutures; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Queue; -import java.util.StringJoiner; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -public class HMSCommitter { - private static final Logger LOG = LogManager.getLogger(HMSCommitter.class); - private final HiveMetadataOps hiveOps; - private final RemoteFileSystem fs; - private final Table table; - - // update statistics for unPartitioned table or existed partition - private final List<UpdateStatisticsTask> updateStatisticsTasks = new ArrayList<>(); - Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16); - - // add new partition - private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask(); - private static final int PARTITION_COMMIT_BATCH_SIZE = 20; - - // for file system rename operation - // whether to cancel the file system tasks - private final AtomicBoolean fileSystemTaskCancelled = new AtomicBoolean(false); - // file system tasks that are executed asynchronously, including rename_file, rename_dir - private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new ArrayList<>(); - // when aborted, we need to delete all files under this path, even the current directory - private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>(); - // when aborted, we need restore directory - private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>(); - // when finished, we need clear some directories - private final List<String> clearDirsForFinish = new ArrayList<>(); - Executor fileSystemExecutor = Executors.newFixedThreadPool(16); - - public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) { - this.hiveOps = hiveOps; - this.fs = fs; - this.table = table; - } - - public void commit(List<THivePartitionUpdate> hivePUs) { - try { - prepare(mergePartitions(hivePUs)); - doCommit(); - } catch (Throwable t) { - LOG.warn("Failed to commit for {}.{}, abort it.", table.getDbName(), table.getTableName()); - try { - cancelUnStartedAsyncFileSystemTask(); - undoUpdateStatisticsTasks(); - undoAddPartitionsTask(); - waitForAsyncFileSystemTaskSuppressThrowable(); - runDirectoryClearUpTasksForAbort(); - runRenameDirTasksForAbort(); - } catch (Throwable e) { - t.addSuppressed(new Exception("Failed to roll back after commit failure", e)); - } - throw t; - } finally { - runClearPathsForFinish(); - } - } - - public void prepare(List<THivePartitionUpdate> hivePUs) { - - List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>(); - - for (THivePartitionUpdate pu : hivePUs) { - TUpdateMode updateMode = pu.getUpdateMode(); - HivePartitionStatistics hivePartitionStatistics = HivePartitionStatistics.fromCommonStatistics( - pu.getRowCount(), - pu.getFileNamesSize(), - pu.getFileSize()); - if (table.getPartitionKeysSize() == 0) { - Preconditions.checkArgument(hivePUs.size() == 1, - "When updating a non-partitioned table, multiple partitions should not be written"); - switch (updateMode) { - case APPEND: - prepareAppendTable(pu, hivePartitionStatistics); - break; - case OVERWRITE: - prepareOverwriteTable(pu, hivePartitionStatistics); - break; - default: - throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table"); - } - } else { - switch (updateMode) { - case NEW: - prepareCreateNewPartition(pu, hivePartitionStatistics); - break; - case APPEND: - insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); - break; - case OVERWRITE: - prepareOverwritePartition(pu, hivePartitionStatistics); - break; - default: - throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table"); - } - } - } - - if (!insertExistsPartitions.isEmpty()) { - prepareInsertExistPartition(insertExistsPartitions); - } - } - - public List<THivePartitionUpdate> mergePartitions(List<THivePartitionUpdate> hivePUs) { - Map<String, THivePartitionUpdate> mm = new HashMap<>(); - for (THivePartitionUpdate pu : hivePUs) { - if (mm.containsKey(pu.getName())) { - THivePartitionUpdate old = mm.get(pu.getName()); - old.setFileSize(old.getFileSize() + pu.getFileSize()); - old.setRowCount(old.getRowCount() + pu.getRowCount()); - old.getFileNames().addAll(pu.getFileNames()); - } else { - mm.put(pu.getName(), pu); - } - } - return new ArrayList<>(mm.values()); - } - - public void doCommit() { - waitForAsyncFileSystemTasks(); - doAddPartitionsTask(); - doUpdateStatisticsTasks(); - } - - public void rollback() { - - } - - public void cancelUnStartedAsyncFileSystemTask() { - fileSystemTaskCancelled.set(true); - } - - private void undoUpdateStatisticsTasks() { - ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = ImmutableList.builder(); - for (UpdateStatisticsTask task : updateStatisticsTasks) { - undoUpdateFutures.add(CompletableFuture.runAsync(() -> { - try { - task.undo(hiveOps); - } catch (Throwable throwable) { - LOG.warn("Failed to rollback: {}", task.getDescription(), throwable); - } - }, updateStatisticsExecutor)); - } - - for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) { - MoreFutures.getFutureValue(undoUpdateFuture); - } - } - - private void undoAddPartitionsTask() { - if (addPartitionsTask.isEmpty()) { - return; - } - - HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition(); - String dbName = firstPartition.getDbName(); - String tableName = firstPartition.getTblName(); - List<List<String>> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps); - if (!rollbackFailedPartitions.isEmpty()) { - LOG.warn("Failed to rollback: add_partition for partition values {}.{}.{}", - dbName, tableName, rollbackFailedPartitions); - } - } - - private void waitForAsyncFileSystemTaskSuppressThrowable() { - for (CompletableFuture<?> future : asyncFileSystemTaskFutures) { - try { - future.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Throwable t) { - // ignore - } - } - } - - public void prepareAppendTable(THivePartitionUpdate pu, HivePartitionStatistics ps) { - String targetPath = pu.getLocation().getTargetPath(); - String writePath = pu.getLocation().getWritePath(); - if (!targetPath.equals(writePath)) { - fs.asyncRename( - fileSystemExecutor, - asyncFileSystemTaskFutures, - fileSystemTaskCancelled, - writePath, - targetPath, - pu.getFileNames()); - } - directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); - updateStatisticsTasks.add( - new UpdateStatisticsTask( - table.getDbName(), - table.getTableName(), - Optional.empty(), - ps, - true - )); - } - - public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) { - String targetPath = pu.getLocation().getTargetPath(); - String writePath = pu.getLocation().getWritePath(); - if (!targetPath.equals(writePath)) { - Path path = new Path(targetPath); - String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); - Status status = fs.renameDir( - targetPath, - oldTablePath, - () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath))); - if (!status.ok()) { - throw new RuntimeException( - "Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg()); - } - clearDirsForFinish.add(oldTablePath); - - status = fs.renameDir( - writePath, - targetPath, - () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); - if (!status.ok()) { - throw new RuntimeException( - "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); - } - } - updateStatisticsTasks.add( - new UpdateStatisticsTask( - table.getDbName(), - table.getTableName(), - Optional.empty(), - ps, - false - )); - } - - public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { - - String targetPath = pu.getLocation().getTargetPath(); - String writePath = pu.getLocation().getWritePath(); - - if (!targetPath.equals(writePath)) { - fs.asyncRenameDir( - fileSystemExecutor, - asyncFileSystemTaskFutures, - fileSystemTaskCancelled, - writePath, - targetPath, - () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); - } - - StorageDescriptor sd = table.getSd(); - - HivePartition hivePartition = new HivePartition( - table.getDbName(), - table.getTableName(), - false, - sd.getInputFormat(), - pu.getLocation().getTargetPath(), - HiveUtil.toPartitionValues(pu.getName()), - Maps.newHashMap(), - sd.getOutputFormat(), - sd.getSerdeInfo().getSerializationLib(), - hiveOps.getClient().getSchema(table.getDbName(), table.getTableName()) - ); - HivePartitionWithStatistics partitionWithStats = - new HivePartitionWithStatistics(pu.getName(), hivePartition, ps); - addPartitionsTask.addPartition(partitionWithStats); - } - - public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) { - for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitionBatch : - Iterables.partition(partitions, 100)) { - List<String> partitionNames = partitionBatch.stream() - .map(pair -> pair.first.getName()) - .collect(Collectors.toList()); - - Map<String, Partition> partitionsByNamesMap = HiveUtil.convertToNamePartitionMap( - partitionNames, - hiveOps.getClient().getPartitions(table.getDbName(), table.getTableName(), partitionNames)); - - for (int i = 0; i < partitionsByNamesMap.size(); i++) { - String partitionName = partitionNames.get(i); - if (partitionsByNamesMap.get(partitionName) == null) { - // Prevent this partition from being deleted by other engines - throw new RuntimeException("Not found partition: " + partitionName); - } - - THivePartitionUpdate pu = partitionBatch.get(i).first; - HivePartitionStatistics updateStats = partitionBatch.get(i).second; - - String writePath = pu.getLocation().getWritePath(); - String targetPath = pu.getLocation().getTargetPath(); - directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); - - if (!targetPath.equals(writePath)) { - fs.asyncRename( - fileSystemExecutor, - asyncFileSystemTaskFutures, - fileSystemTaskCancelled, - writePath, - targetPath, - pu.getFileNames()); - } - - updateStatisticsTasks.add( - new UpdateStatisticsTask( - table.getDbName(), - table.getTableName(), - Optional.of(pu.getName()), - updateStats, - true)); - } - } - } - - - public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { - String targetPath = pu.getLocation().getTargetPath(); - String writePath = pu.getLocation().getWritePath(); - if (!targetPath.equals(writePath)) { - Path path = new Path(targetPath); - String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); - Status status = fs.renameDir( - targetPath, - oldPartitionPath, - () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath))); - if (!status.ok()) { - throw new RuntimeException( - "Error to rename dir from " + targetPath + " to " + oldPartitionPath + ":" + status.getErrMsg()); - } - clearDirsForFinish.add(oldPartitionPath); - - status = fs.renameDir( - writePath, - targetPath, - () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); - if (!status.ok()) { - throw new RuntimeException( - "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); - } - } - updateStatisticsTasks.add( - new UpdateStatisticsTask( - table.getDbName(), - table.getTableName(), - Optional.of(pu.getName()), - ps, - false - )); - } - - - private void waitForAsyncFileSystemTasks() { - for (CompletableFuture<?> future : asyncFileSystemTaskFutures) { - MoreFutures.getFutureValue(future, RuntimeException.class); - } - } - - private void doAddPartitionsTask() { - if (!addPartitionsTask.isEmpty()) { - addPartitionsTask.run(hiveOps); - } - } - - private void doUpdateStatisticsTasks() { - ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = ImmutableList.builder(); - List<String> failedTaskDescriptions = new ArrayList<>(); - List<Throwable> suppressedExceptions = new ArrayList<>(); - for (UpdateStatisticsTask task : updateStatisticsTasks) { - updateStatsFutures.add(CompletableFuture.runAsync(() -> { - try { - task.run(hiveOps); - } catch (Throwable t) { - synchronized (suppressedExceptions) { - addSuppressedExceptions(suppressedExceptions, t, failedTaskDescriptions, task.getDescription()); - } - } - }, updateStatisticsExecutor)); - } - - for (CompletableFuture<?> executeUpdateFuture : updateStatsFutures.build()) { - MoreFutures.getFutureValue(executeUpdateFuture); - } - if (!suppressedExceptions.isEmpty()) { - StringBuilder message = new StringBuilder(); - message.append("Failed to execute some updating statistics tasks: "); - Joiner.on("; ").appendTo(message, failedTaskDescriptions); - RuntimeException exception = new RuntimeException(message.toString()); - suppressedExceptions.forEach(exception::addSuppressed); - throw exception; - } - } - - private static void addSuppressedExceptions( - List<Throwable> suppressedExceptions, - Throwable t, - List<String> descriptions, - String description) { - descriptions.add(description); - // A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily. - if (suppressedExceptions.size() < 5) { - suppressedExceptions.add(t); - } - } - - private static class AddPartition { - - } - - private static class UpdateStatisticsTask { - private final String dbName; - private final String tableName; - private final Optional<String> partitionName; - private final HivePartitionStatistics updatePartitionStat; - private final boolean merge; - - private boolean done; - - public UpdateStatisticsTask(String dbName, String tableName, Optional<String> partitionName, - HivePartitionStatistics statistics, boolean merge) { - this.dbName = Objects.requireNonNull(dbName, "dbName is null"); - this.tableName = Objects.requireNonNull(tableName, "tableName is null"); - this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null"); - this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null"); - this.merge = merge; - } - - public void run(HiveMetadataOps hiveOps) { - if (partitionName.isPresent()) { - hiveOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::updateStatistics); - } else { - hiveOps.updateTableStatistics(dbName, tableName, this::updateStatistics); - } - done = true; - } - - public void undo(HiveMetadataOps hmsOps) { - if (!done) { - return; - } - if (partitionName.isPresent()) { - hmsOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::resetStatistics); - } else { - hmsOps.updateTableStatistics(dbName, tableName, this::resetStatistics); - } - } - - public String getDescription() { - if (partitionName.isPresent()) { - return "alter partition parameters " + tableName + " " + partitionName.get(); - } else { - return "alter table parameters " + tableName; - } - } - - private HivePartitionStatistics updateStatistics(HivePartitionStatistics currentStats) { - return merge ? HivePartitionStatistics.merge(currentStats, updatePartitionStat) : updatePartitionStat; - } - - private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) { - return HivePartitionStatistics - .reduce(currentStatistics, updatePartitionStat, HivePartitionStatistics.ReduceOperator.SUBTRACT); - } - } - - public static class AddPartitionsTask { - private final List<HivePartitionWithStatistics> partitions = new ArrayList<>(); - private final List<List<String>> createdPartitionValues = new ArrayList<>(); - - public boolean isEmpty() { - return partitions.isEmpty(); - } - - public List<HivePartitionWithStatistics> getPartitions() { - return partitions; - } - - public void addPartition(HivePartitionWithStatistics partition) { - partitions.add(partition); - } - - public void run(HiveMetadataOps hiveOps) { - HivePartition firstPartition = partitions.get(0).getPartition(); - String dbName = firstPartition.getDbName(); - String tableName = firstPartition.getTblName(); - List<List<HivePartitionWithStatistics>> batchedPartitions = - Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE); - for (List<HivePartitionWithStatistics> batch : batchedPartitions) { - try { - hiveOps.addPartitions(dbName, tableName, batch); - for (HivePartitionWithStatistics partition : batch) { - createdPartitionValues.add(partition.getPartition().getPartitionValues()); - } - } catch (Throwable t) { - LOG.warn("Failed to add partition", t); - throw t; - } - } - partitions.clear(); - } - - public List<List<String>> rollback(HiveMetadataOps hiveOps) { - HivePartition firstPartition = partitions.get(0).getPartition(); - String dbName = firstPartition.getDbName(); - String tableName = firstPartition.getTblName(); - List<List<String>> rollbackFailedPartitions = new ArrayList<>(); - for (List<String> createdPartitionValue : createdPartitionValues) { - try { - hiveOps.dropPartition(dbName, tableName, createdPartitionValue, false); - } catch (Throwable t) { - LOG.warn("Failed to drop partition on {}.{}.{} when rollback", - dbName, tableName, rollbackFailedPartitions); - rollbackFailedPartitions.add(createdPartitionValue); - } - } - return rollbackFailedPartitions; - } - } - - private static class DirectoryCleanUpTask { - private final Path path; - private final boolean deleteEmptyDir; - - public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) { - this.path = new Path(path); - this.deleteEmptyDir = deleteEmptyDir; - } - - public Path getPath() { - return path; - } - - public boolean isDeleteEmptyDir() { - return deleteEmptyDir; - } - - @Override - public String toString() { - return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]") - .add("path=" + path) - .add("deleteEmptyDir=" + deleteEmptyDir) - .toString(); - } - } - - public static class DeleteRecursivelyResult { - private final boolean dirNoLongerExists; - private final List<String> notDeletedEligibleItems; - - public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> notDeletedEligibleItems) { - this.dirNoLongerExists = dirNoLongerExists; - this.notDeletedEligibleItems = notDeletedEligibleItems; - } - - public boolean dirNotExists() { - return dirNoLongerExists; - } - - public List<String> getNotDeletedEligibleItems() { - return notDeletedEligibleItems; - } - } - - private void runDirectoryClearUpTasksForAbort() { - for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) { - recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir()); - } - } - - private static class RenameDirectoryTask { - private final String renameFrom; - private final String renameTo; - - public RenameDirectoryTask(String renameFrom, String renameTo) { - this.renameFrom = renameFrom; - this.renameTo = renameTo; - } - - public String getRenameFrom() { - return renameFrom; - } - - public String getRenameTo() { - return renameTo; - } - - @Override - public String toString() { - return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]") - .add("renameFrom:" + renameFrom) - .add("renameTo:" + renameTo) - .toString(); - } - } - - private void runRenameDirTasksForAbort() { - Status status; - for (RenameDirectoryTask task : renameDirectoryTasksForAbort) { - status = fs.exists(task.getRenameFrom()); - if (status.ok()) { - status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {}); - if (!status.ok()) { - LOG.warn("Failed to abort rename dir from {} to {}:{}", - task.getRenameFrom(), task.getRenameTo(), status.getErrMsg()); - } - } - } - } - - private void runClearPathsForFinish() { - Status status; - for (String path : clearDirsForFinish) { - status = fs.delete(path); - if (!status.ok()) { - LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode()); - } - } - } - - - private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) { - DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir); - - if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { - LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.", - directory.toString(), deleteResult.getNotDeletedEligibleItems()); - } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { - LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString()); - } - } - - public DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { - try { - if (!fs.exists(directory.getName()).ok()) { - return new DeleteRecursivelyResult(true, ImmutableList.of()); - } - } catch (Exception e) { - ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); - notDeletedEligibleItems.add(directory.toString() + "/*"); - return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); - } - - return doRecursiveDeleteFiles(directory, deleteEmptyDir); - } - - private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { - List<RemoteFile> remoteFiles = new ArrayList<>(); - - Status status = fs.list(directory.getName(), remoteFiles); - if (!status.ok()) { - ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); - notDeletedEligibleItems.add(directory + "/*"); - return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); - } - - boolean isEmptyDir = true; - List<String> notDeletedEligibleItems = new ArrayList<>(); - for (RemoteFile file : remoteFiles) { - if (file.isFile()) { - Path filePath = file.getPath(); - isEmptyDir = false; - // TODO Check if this file was created by this query - if (!deleteIfExists(filePath)) { - notDeletedEligibleItems.add(filePath.toString()); - } - } else if (file.isDirectory()) { - DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir); - if (!subResult.dirNotExists()) { - isEmptyDir = false; - } - if (!subResult.getNotDeletedEligibleItems().isEmpty()) { - notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); - } - } else { - isEmptyDir = false; - notDeletedEligibleItems.add(file.getPath().toString()); - } - } - - if (isEmptyDir && deleteEmptyDir) { - Verify.verify(notDeletedEligibleItems.isEmpty()); - if (!deleteIfExists(directory)) { - return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/")); - } - // all items of the location have been deleted. - return new DeleteRecursivelyResult(true, ImmutableList.of()); - } - - return new DeleteRecursivelyResult(false, notDeletedEligibleItems); - } - - public boolean deleteIfExists(Path path) { - Status status = fs.delete(path.getName()); - if (status.ok()) { - return true; - } - return !fs.exists(path.getName()).ok(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 0f2a7bb2acb..4474e546500 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -34,6 +34,7 @@ import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.transaction.TransactionManagerFactory; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -145,7 +146,10 @@ public class HMSExternalCatalog extends ExternalCatalog { AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)); } - metadataOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); + HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); + transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps); + transactionManager.setEditLog(Env.getCurrentEnv().getEditLog()); + metadataOps = hiveOps; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java new file mode 100644 index 00000000000..c3e8d00c5d1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -0,0 +1,1322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +// and modified by Doris + +package org.apache.doris.datasource.hive; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.Pair; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.thrift.TUpdateMode; +import org.apache.doris.transaction.Transaction; + +import com.google.common.base.Joiner; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.airlift.concurrent.MoreFutures; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class HMSTransaction implements Transaction { + private static final Logger LOG = LogManager.getLogger(HMSTransaction.class); + private final HiveMetadataOps hiveOps; + private final RemoteFileSystem fs; + private String dbName; + private String tbName; + + private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = new HashMap<>(); + private final Map<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>> + partitionActions = new HashMap<>(); + + private HmsCommitter hmsCommitter; + private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList(); + + public HMSTransaction(HiveMetadataOps hiveOps) { + this.hiveOps = hiveOps; + this.fs = hiveOps.getFs(); + } + + @Override + public void commit() { + doCommit(); + } + + public String getDbName() { + return dbName; + } + + public String getTbName() { + return tbName; + } + + public List<THivePartitionUpdate> mergePartitions(List<THivePartitionUpdate> hivePUs) { + Map<String, THivePartitionUpdate> mm = new HashMap<>(); + for (THivePartitionUpdate pu : hivePUs) { + if (mm.containsKey(pu.getName())) { + THivePartitionUpdate old = mm.get(pu.getName()); + old.setFileSize(old.getFileSize() + pu.getFileSize()); + old.setRowCount(old.getRowCount() + pu.getRowCount()); + old.getFileNames().addAll(pu.getFileNames()); + } else { + mm.put(pu.getName(), pu); + } + } + return new ArrayList<>(mm.values()); + } + + @Override + public void rollback() { + if (hmsCommitter != null) { + hmsCommitter.rollback(); + } + } + + public void finishInsertTable(String dbName, String tbName) { + this.tbName = tbName; + this.dbName = dbName; + List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates); + Table table = getTable(dbName, tbName); + List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>(); + for (THivePartitionUpdate pu : mergedPUs) { + TUpdateMode updateMode = pu.getUpdateMode(); + HivePartitionStatistics hivePartitionStatistics = HivePartitionStatistics.fromCommonStatistics( + pu.getRowCount(), + pu.getFileNamesSize(), + pu.getFileSize()); + String writePath = pu.getLocation().getWritePath(); + if (table.getPartitionKeysSize() == 0) { + Preconditions.checkArgument(mergedPUs.size() == 1, + "When updating a non-partitioned table, multiple partitions should not be written"); + switch (updateMode) { + case APPEND: + finishChangingExistingTable( + ActionType.INSERT_EXISTING, + dbName, + tbName, + writePath, + pu.getFileNames(), + hivePartitionStatistics); + break; + case OVERWRITE: + dropTable(dbName, tbName); + createTable(table, writePath, pu.getFileNames(), hivePartitionStatistics); + break; + default: + throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table"); + } + } else { + switch (updateMode) { + case APPEND: + // insert into existing partition + insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); + break; + case NEW: + case OVERWRITE: + StorageDescriptor sd = table.getSd(); + HivePartition hivePartition = new HivePartition( + dbName, + tbName, + false, + sd.getInputFormat(), + pu.getLocation().getTargetPath(), + HiveUtil.toPartitionValues(pu.getName()), + Maps.newHashMap(), + sd.getOutputFormat(), + sd.getSerdeInfo().getSerializationLib(), + hiveOps.getClient().getSchema(dbName, tbName) + ); + if (updateMode == TUpdateMode.OVERWRITE) { + dropPartition(dbName, tbName, hivePartition.getPartitionValues(), true); + } + addPartition( + dbName, tbName, hivePartition, writePath, + pu.getName(), pu.getFileNames(), hivePartitionStatistics); + break; + default: + throw new RuntimeException("Not support mode:[" + updateMode + "] in partitioned table"); + } + } + } + + if (!insertExistsPartitions.isEmpty()) { + convertToInsertExistingPartitionAction(insertExistsPartitions); + } + } + + public void doCommit() { + hmsCommitter = new HmsCommitter(); + + try { + for (Map.Entry<DatabaseTableName, Action<TableAndMore>> entry : tableActions.entrySet()) { + Action<TableAndMore> action = entry.getValue(); + switch (action.getType()) { + case INSERT_EXISTING: + hmsCommitter.prepareInsertExistingTable(action.getData()); + break; + case ALTER: + hmsCommitter.prepareAlterTable(action.getData()); + break; + default: + throw new UnsupportedOperationException("Unsupported table action type: " + action.getType()); + } + } + + for (Map.Entry<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>> tableEntry + : partitionActions.entrySet()) { + for (Map.Entry<List<String>, Action<PartitionAndMore>> partitionEntry : + tableEntry.getValue().entrySet()) { + Action<PartitionAndMore> action = partitionEntry.getValue(); + switch (action.getType()) { + case INSERT_EXISTING: + hmsCommitter.prepareInsertExistPartition(action.getData()); + break; + case ADD: + hmsCommitter.prepareAddPartition(action.getData()); + break; + case ALTER: + hmsCommitter.prepareAlterPartition(action.getData()); + break; + default: + throw new UnsupportedOperationException( + "Unsupported partition action type: " + action.getType()); + } + } + } + + hmsCommitter.waitForAsyncFileSystemTasks(); + hmsCommitter.doAddPartitionsTask(); + hmsCommitter.doUpdateStatisticsTasks(); + } catch (Throwable t) { + LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName); + hmsCommitter.cancelUnStartedAsyncFileSystemTask(); + hmsCommitter.undoUpdateStatisticsTasks(); + hmsCommitter.undoAddPartitionsTask(); + hmsCommitter.waitForAsyncFileSystemTaskSuppressThrowable(); + hmsCommitter.runDirectoryClearUpTasksForAbort(); + hmsCommitter.runRenameDirTasksForAbort(); + throw t; + } finally { + hmsCommitter.runClearPathsForFinish(); + } + } + + public void updateHivePartitionUpdates(List<THivePartitionUpdate> pus) { + synchronized (this) { + hivePartitionUpdates.addAll(pus); + } + } + + // for test + public void setHivePartitionUpdates(List<THivePartitionUpdate> hivePartitionUpdates) { + this.hivePartitionUpdates = hivePartitionUpdates; + } + + public long getUpdateCnt() { + return hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum(); + } + + private void convertToInsertExistingPartitionAction( + List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) { + DatabaseTableName databaseTableName = new DatabaseTableName(dbName, tbName); + Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable = + partitionActions.computeIfAbsent(databaseTableName, k -> new HashMap<>()); + + for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitionBatch : + Iterables.partition(partitions, 100)) { + + List<String> partitionNames = partitionBatch + .stream() + .map(pair -> pair.first.getName()) + .collect(Collectors.toList()); + + // check in partitionAction + Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partitionNames); + if (oldPartitionAction != null) { + switch (oldPartitionAction.getType()) { + case DROP: + case DROP_PRESERVE_DATA: + throw new RuntimeException( + "Not found partition from partition actions" + + "for " + databaseTableName + ", partitions: " + partitionNames); + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + throw new UnsupportedOperationException( + "Inserting into a partition that were added, altered," + + "or inserted into in the same transaction is not supported"); + default: + throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType()); + } + } + + Map<String, Partition> partitionsByNamesMap = HiveUtil.convertToNamePartitionMap( + partitionNames, + hiveOps.getClient().getPartitions(dbName, tbName, partitionNames)); + + for (int i = 0; i < partitionsByNamesMap.size(); i++) { + String partitionName = partitionNames.get(i); + // check from hms + Partition partition = partitionsByNamesMap.get(partitionName); + if (partition == null) { + // Prevent this partition from being deleted by other engines + throw new RuntimeException( + "Not found partition from hms for " + databaseTableName + + ", partitions: " + partitionNames); + } + THivePartitionUpdate pu = partitionBatch.get(i).first; + HivePartitionStatistics updateStats = partitionBatch.get(i).second; + + StorageDescriptor sd = partition.getSd(); + List<String> partitionValues = HiveUtil.toPartitionValues(pu.getName()); + + HivePartition hivePartition = new HivePartition( + dbName, + tbName, + false, + sd.getInputFormat(), + partition.getSd().getLocation(), + partitionValues, + partition.getParameters(), + sd.getOutputFormat(), + sd.getSerdeInfo().getSerializationLib(), + hiveOps.getClient().getSchema(dbName, tbName) + ); + + partitionActionsForTable.put( + partitionValues, + new Action<>( + ActionType.INSERT_EXISTING, + new PartitionAndMore( + hivePartition, + pu.getLocation().getWritePath(), + pu.getName(), + pu.getFileNames(), + updateStats + )) + ); + } + } + } + + private static void addSuppressedExceptions( + List<Throwable> suppressedExceptions, + Throwable t, + List<String> descriptions, + String description) { + descriptions.add(description); + // A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily. + if (suppressedExceptions.size() < 5) { + suppressedExceptions.add(t); + } + } + + private static class UpdateStatisticsTask { + private final String dbName; + private final String tableName; + private final Optional<String> partitionName; + private final HivePartitionStatistics updatePartitionStat; + private final boolean merge; + + private boolean done; + + public UpdateStatisticsTask(String dbName, String tableName, Optional<String> partitionName, + HivePartitionStatistics statistics, boolean merge) { + this.dbName = Objects.requireNonNull(dbName, "dbName is null"); + this.tableName = Objects.requireNonNull(tableName, "tableName is null"); + this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null"); + this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null"); + this.merge = merge; + } + + public void run(HiveMetadataOps hiveOps) { + if (partitionName.isPresent()) { + hiveOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::updateStatistics); + } else { + hiveOps.updateTableStatistics(dbName, tableName, this::updateStatistics); + } + done = true; + } + + public void undo(HiveMetadataOps hmsOps) { + if (!done) { + return; + } + if (partitionName.isPresent()) { + hmsOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::resetStatistics); + } else { + hmsOps.updateTableStatistics(dbName, tableName, this::resetStatistics); + } + } + + public String getDescription() { + if (partitionName.isPresent()) { + return "alter partition parameters " + tableName + " " + partitionName.get(); + } else { + return "alter table parameters " + tableName; + } + } + + private HivePartitionStatistics updateStatistics(HivePartitionStatistics currentStats) { + return merge ? HivePartitionStatistics.merge(currentStats, updatePartitionStat) : updatePartitionStat; + } + + private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) { + return HivePartitionStatistics + .reduce(currentStatistics, updatePartitionStat, HivePartitionStatistics.ReduceOperator.SUBTRACT); + } + } + + public static class AddPartitionsTask { + private final List<HivePartitionWithStatistics> partitions = new ArrayList<>(); + private final List<List<String>> createdPartitionValues = new ArrayList<>(); + + public boolean isEmpty() { + return partitions.isEmpty(); + } + + public List<HivePartitionWithStatistics> getPartitions() { + return partitions; + } + + public void addPartition(HivePartitionWithStatistics partition) { + partitions.add(partition); + } + + public void run(HiveMetadataOps hiveOps) { + HivePartition firstPartition = partitions.get(0).getPartition(); + String dbName = firstPartition.getDbName(); + String tableName = firstPartition.getTblName(); + List<List<HivePartitionWithStatistics>> batchedPartitions = Lists.partition(partitions, 20); + for (List<HivePartitionWithStatistics> batch : batchedPartitions) { + try { + hiveOps.addPartitions(dbName, tableName, batch); + for (HivePartitionWithStatistics partition : batch) { + createdPartitionValues.add(partition.getPartition().getPartitionValues()); + } + } catch (Throwable t) { + LOG.warn("Failed to add partition", t); + throw t; + } + } + partitions.clear(); + } + + public List<List<String>> rollback(HiveMetadataOps hiveOps) { + HivePartition firstPartition = partitions.get(0).getPartition(); + String dbName = firstPartition.getDbName(); + String tableName = firstPartition.getTblName(); + List<List<String>> rollbackFailedPartitions = new ArrayList<>(); + for (List<String> createdPartitionValue : createdPartitionValues) { + try { + hiveOps.dropPartition(dbName, tableName, createdPartitionValue, false); + } catch (Throwable t) { + LOG.warn("Failed to drop partition on {}.{}.{} when rollback", + dbName, tableName, rollbackFailedPartitions); + rollbackFailedPartitions.add(createdPartitionValue); + } + } + return rollbackFailedPartitions; + } + } + + private static class DirectoryCleanUpTask { + private final Path path; + private final boolean deleteEmptyDir; + + public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) { + this.path = new Path(path); + this.deleteEmptyDir = deleteEmptyDir; + } + + public Path getPath() { + return path; + } + + public boolean isDeleteEmptyDir() { + return deleteEmptyDir; + } + + @Override + public String toString() { + return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]") + .add("path=" + path) + .add("deleteEmptyDir=" + deleteEmptyDir) + .toString(); + } + } + + private static class DeleteRecursivelyResult { + private final boolean dirNoLongerExists; + private final List<String> notDeletedEligibleItems; + + public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> notDeletedEligibleItems) { + this.dirNoLongerExists = dirNoLongerExists; + this.notDeletedEligibleItems = notDeletedEligibleItems; + } + + public boolean dirNotExists() { + return dirNoLongerExists; + } + + public List<String> getNotDeletedEligibleItems() { + return notDeletedEligibleItems; + } + } + + private static class RenameDirectoryTask { + private final String renameFrom; + private final String renameTo; + + public RenameDirectoryTask(String renameFrom, String renameTo) { + this.renameFrom = renameFrom; + this.renameTo = renameTo; + } + + public String getRenameFrom() { + return renameFrom; + } + + public String getRenameTo() { + return renameTo; + } + + @Override + public String toString() { + return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]") + .add("renameFrom:" + renameFrom) + .add("renameTo:" + renameTo) + .toString(); + } + } + + + + private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) { + DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir); + + if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { + LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.", + directory.toString(), deleteResult.getNotDeletedEligibleItems()); + } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { + LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString()); + } + } + + private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { + try { + if (!fs.exists(directory.getName()).ok()) { + return new DeleteRecursivelyResult(true, ImmutableList.of()); + } + } catch (Exception e) { + ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); + notDeletedEligibleItems.add(directory.toString() + "/*"); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); + } + + return doRecursiveDeleteFiles(directory, deleteEmptyDir); + } + + private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { + List<RemoteFile> remoteFiles = new ArrayList<>(); + + Status status = fs.list(directory.getName(), remoteFiles); + if (!status.ok()) { + ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); + notDeletedEligibleItems.add(directory + "/*"); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); + } + + boolean isEmptyDir = true; + List<String> notDeletedEligibleItems = new ArrayList<>(); + for (RemoteFile file : remoteFiles) { + if (file.isFile()) { + Path filePath = file.getPath(); + isEmptyDir = false; + // TODO Check if this file was created by this query + if (!deleteIfExists(filePath)) { + notDeletedEligibleItems.add(filePath.toString()); + } + } else if (file.isDirectory()) { + DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir); + if (!subResult.dirNotExists()) { + isEmptyDir = false; + } + if (!subResult.getNotDeletedEligibleItems().isEmpty()) { + notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); + } + } else { + isEmptyDir = false; + notDeletedEligibleItems.add(file.getPath().toString()); + } + } + + if (isEmptyDir && deleteEmptyDir) { + Verify.verify(notDeletedEligibleItems.isEmpty()); + if (!deleteIfExists(directory)) { + return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/")); + } + // all items of the location have been deleted. + return new DeleteRecursivelyResult(true, ImmutableList.of()); + } + + return new DeleteRecursivelyResult(false, notDeletedEligibleItems); + } + + public boolean deleteIfExists(Path path) { + Status status = fs.delete(path.getName()); + if (status.ok()) { + return true; + } + return !fs.exists(path.getName()).ok(); + } + + public static class DatabaseTableName { + private final String dbName; + private final String tbName; + + public DatabaseTableName(String dbName, String tbName) { + this.dbName = dbName; + this.tbName = tbName; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DatabaseTableName that = (DatabaseTableName) other; + return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName); + } + + @Override + public String toString() { + return dbName + "." + tbName; + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tbName); + } + + public String getTbName() { + return tbName; + } + + public String getDbName() { + return dbName; + } + } + + private static class TableAndMore { + private final Table table; + private final String currentLocation; + private final List<String> fileNames; + private final HivePartitionStatistics statisticsUpdate; + + public TableAndMore( + Table table, + String currentLocation, + List<String> fileNames, + HivePartitionStatistics statisticsUpdate) { + this.table = Objects.requireNonNull(table, "table is null"); + this.currentLocation = Objects.requireNonNull(currentLocation); + this.fileNames = Objects.requireNonNull(fileNames); + this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, "statisticsUpdate is null"); + } + + public Table getTable() { + return table; + } + + public String getCurrentLocation() { + return currentLocation; + } + + public List<String> getFileNames() { + return fileNames; + } + + public HivePartitionStatistics getStatisticsUpdate() { + return statisticsUpdate; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("table", table) + .add("statisticsUpdate", statisticsUpdate) + .toString(); + } + } + + private static class PartitionAndMore { + private final HivePartition partition; + private final String currentLocation; + private final String partitionName; + private final List<String> fileNames; + private final HivePartitionStatistics statisticsUpdate; + + public PartitionAndMore( + HivePartition partition, + String currentLocation, + String partitionName, + List<String> fileNames, + HivePartitionStatistics statisticsUpdate) { + this.partition = Objects.requireNonNull(partition, "partition is null"); + this.currentLocation = Objects.requireNonNull(currentLocation, "currentLocation is null"); + this.partitionName = Objects.requireNonNull(partitionName, "partition is null"); + this.fileNames = Objects.requireNonNull(fileNames, "fileNames is null"); + this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, "statisticsUpdate is null"); + } + + public HivePartition getPartition() { + return partition; + } + + public String getCurrentLocation() { + return currentLocation; + } + + public String getPartitionName() { + return partitionName; + } + + public List<String> getFileNames() { + return fileNames; + } + + public HivePartitionStatistics getStatisticsUpdate() { + return statisticsUpdate; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("partition", partition) + .add("currentLocation", currentLocation) + .add("fileNames", fileNames) + .toString(); + } + } + + private enum ActionType { + // drop a table/partition + DROP, + // drop a table/partition but will preserve data + DROP_PRESERVE_DATA, + // add a table/partition + ADD, + // drop then add a table/partition, like overwrite + ALTER, + // insert into an existing table/partition + INSERT_EXISTING, + // merger into an existing table/partition + MERGE, + } + + public static class Action<T> { + private final ActionType type; + private final T data; + + public Action(ActionType type, T data) { + this.type = Objects.requireNonNull(type, "type is null"); + if (type == ActionType.DROP || type == ActionType.DROP_PRESERVE_DATA) { + Preconditions.checkArgument(data == null, "data is not null"); + } else { + Objects.requireNonNull(data, "data is null"); + } + this.data = data; + } + + public ActionType getType() { + return type; + } + + public T getData() { + Preconditions.checkState(type != ActionType.DROP); + return data; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("type", type) + .add("data", data) + .toString(); + } + } + + public synchronized Table getTable(String databaseName, String tableName) { + Action<TableAndMore> tableAction = tableActions.get(new DatabaseTableName(databaseName, tableName)); + if (tableAction == null) { + return hiveOps.getClient().getTable(databaseName, tableName); + } + switch (tableAction.getType()) { + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + return tableAction.getData().getTable(); + case DROP: + case DROP_PRESERVE_DATA: + break; + default: + throw new IllegalStateException("Unknown action type: " + tableAction.getType()); + } + throw new RuntimeException("Not Found table: " + databaseName + "." + tableName); + } + + public synchronized void finishChangingExistingTable( + ActionType actionType, + String databaseName, + String tableName, + String location, + List<String> fileNames, + HivePartitionStatistics statisticsUpdate) { + DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName); + Action<TableAndMore> oldTableAction = tableActions.get(databaseTableName); + if (oldTableAction == null) { + Table table = hiveOps.getClient().getTable(databaseTableName.getDbName(), databaseTableName.getTbName()); + tableActions.put( + databaseTableName, + new Action<>( + actionType, + new TableAndMore( + table, + location, + fileNames, + statisticsUpdate))); + return; + } + + switch (oldTableAction.getType()) { + case DROP: + throw new RuntimeException("Not found table: " + databaseTableName); + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + throw new UnsupportedOperationException( + "Inserting into an unpartitioned table that were added, altered," + + "or inserted into in the same transaction is not supported"); + case DROP_PRESERVE_DATA: + break; + default: + throw new IllegalStateException("Unknown action type: " + oldTableAction.getType()); + } + } + + public synchronized void createTable( + Table table, String location, List<String> fileNames, HivePartitionStatistics statistics) { + // When creating a table, it should never have partition actions. This is just a sanity check. + checkNoPartitionAction(dbName, tbName); + DatabaseTableName databaseTableName = new DatabaseTableName(dbName, tbName); + Action<TableAndMore> oldTableAction = tableActions.get(databaseTableName); + TableAndMore tableAndMore = new TableAndMore(table, location, fileNames, statistics); + if (oldTableAction == null) { + tableActions.put(databaseTableName, new Action<>(ActionType.ADD, tableAndMore)); + return; + } + switch (oldTableAction.getType()) { + case DROP: + tableActions.put(databaseTableName, new Action<>(ActionType.ALTER, tableAndMore)); + return; + + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + throw new RuntimeException("Table already exists: " + databaseTableName); + case DROP_PRESERVE_DATA: + break; + default: + throw new IllegalStateException("Unknown action type: " + oldTableAction.getType()); + } + } + + + public synchronized void dropTable(String databaseName, String tableName) { + // Dropping table with partition actions requires cleaning up staging data, which is not implemented yet. + checkNoPartitionAction(databaseName, tableName); + DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName); + Action<TableAndMore> oldTableAction = tableActions.get(databaseTableName); + if (oldTableAction == null || oldTableAction.getType() == ActionType.ALTER) { + tableActions.put(databaseTableName, new Action<>(ActionType.DROP, null)); + return; + } + switch (oldTableAction.getType()) { + case DROP: + throw new RuntimeException("Not found table: " + databaseTableName); + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + throw new RuntimeException("Dropping a table added/modified in the same transaction is not supported"); + case DROP_PRESERVE_DATA: + break; + default: + throw new IllegalStateException("Unknown action type: " + oldTableAction.getType()); + } + } + + + private void checkNoPartitionAction(String databaseName, String tableName) { + Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable = + partitionActions.get(new DatabaseTableName(databaseName, tableName)); + if (partitionActionsForTable != null && !partitionActionsForTable.isEmpty()) { + throw new RuntimeException( + "Cannot make schema changes to a table with modified partitions in the same transaction"); + } + } + + public synchronized void addPartition( + String databaseName, + String tableName, + HivePartition partition, + String currentLocation, + String partitionName, + List<String> files, + HivePartitionStatistics statistics) { + Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable = + partitionActions.computeIfAbsent(new DatabaseTableName(databaseName, tableName), k -> new HashMap<>()); + Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partition.getPartitionValues()); + if (oldPartitionAction == null) { + partitionActionsForTable.put( + partition.getPartitionValues(), + new Action<>( + ActionType.ADD, + new PartitionAndMore(partition, currentLocation, partitionName, files, statistics)) + ); + return; + } + switch (oldPartitionAction.getType()) { + case DROP: + case DROP_PRESERVE_DATA: + partitionActionsForTable.put( + partition.getPartitionValues(), + new Action<>( + ActionType.ALTER, + new PartitionAndMore(partition, currentLocation, partitionName, files, statistics)) + ); + return; + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + throw new RuntimeException( + "Partition already exists for table: " + + databaseName + "." + tableName + ", partition values: " + partition.getPartitionValues()); + default: + throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType()); + } + } + + public synchronized void dropPartition( + String databaseName, + String tableName, + List<String> partitionValues, + boolean deleteData) { + DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName); + Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable = + partitionActions.computeIfAbsent(databaseTableName, k -> new HashMap<>()); + Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partitionValues); + if (oldPartitionAction == null) { + if (deleteData) { + partitionActionsForTable.put(partitionValues, new Action<>(ActionType.DROP, null)); + } else { + partitionActionsForTable.put(partitionValues, new Action<>(ActionType.DROP_PRESERVE_DATA, null)); + } + return; + } + switch (oldPartitionAction.getType()) { + case DROP: + case DROP_PRESERVE_DATA: + throw new RuntimeException( + "Not found partition from partition actions for " + databaseTableName + + ", partitions: " + partitionValues); + case ADD: + case ALTER: + case INSERT_EXISTING: + case MERGE: + throw new RuntimeException( + "Dropping a partition added in the same transaction is not supported: " + + databaseTableName + ", partition values: " + partitionValues); + default: + throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType()); + } + } + + class HmsCommitter { + + // update statistics for unPartitioned table or existed partition + private final List<UpdateStatisticsTask> updateStatisticsTasks = new ArrayList<>(); + Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16); + + // add new partition + private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask(); + + // for file system rename operation + // whether to cancel the file system tasks + private final AtomicBoolean fileSystemTaskCancelled = new AtomicBoolean(false); + // file system tasks that are executed asynchronously, including rename_file, rename_dir + private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new ArrayList<>(); + // when aborted, we need to delete all files under this path, even the current directory + private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>(); + // when aborted, we need restore directory + private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>(); + // when finished, we need clear some directories + private final List<String> clearDirsForFinish = new ArrayList<>(); + Executor fileSystemExecutor = Executors.newFixedThreadPool(16); + + public void cancelUnStartedAsyncFileSystemTask() { + fileSystemTaskCancelled.set(true); + } + + private void undoUpdateStatisticsTasks() { + ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = ImmutableList.builder(); + for (UpdateStatisticsTask task : updateStatisticsTasks) { + undoUpdateFutures.add(CompletableFuture.runAsync(() -> { + try { + task.undo(hiveOps); + } catch (Throwable throwable) { + LOG.warn("Failed to rollback: {}", task.getDescription(), throwable); + } + }, updateStatisticsExecutor)); + } + + for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) { + MoreFutures.getFutureValue(undoUpdateFuture); + } + } + + private void undoAddPartitionsTask() { + if (addPartitionsTask.isEmpty()) { + return; + } + + HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition(); + String dbName = firstPartition.getDbName(); + String tableName = firstPartition.getTblName(); + List<List<String>> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps); + if (!rollbackFailedPartitions.isEmpty()) { + LOG.warn("Failed to rollback: add_partition for partition values {}.{}.{}", + dbName, tableName, rollbackFailedPartitions); + } + } + + private void waitForAsyncFileSystemTaskSuppressThrowable() { + for (CompletableFuture<?> future : asyncFileSystemTaskFutures) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable t) { + // ignore + } + } + } + + public void prepareInsertExistingTable(TableAndMore tableAndMore) { + Table table = tableAndMore.getTable(); + String targetPath = table.getSd().getLocation(); + String writePath = tableAndMore.getCurrentLocation(); + if (!targetPath.equals(writePath)) { + fs.asyncRename( + fileSystemExecutor, + asyncFileSystemTaskFutures, + fileSystemTaskCancelled, + writePath, + targetPath, + tableAndMore.getFileNames()); + } + directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); + updateStatisticsTasks.add( + new UpdateStatisticsTask( + dbName, + tbName, + Optional.empty(), + tableAndMore.getStatisticsUpdate(), + true + )); + } + + public void prepareAlterTable(TableAndMore tableAndMore) { + Table table = tableAndMore.getTable(); + String targetPath = table.getSd().getLocation(); + String writePath = tableAndMore.getCurrentLocation(); + if (!targetPath.equals(writePath)) { + Path path = new Path(targetPath); + String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); + Status status = fs.renameDir( + targetPath, + oldTablePath, + () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg()); + } + clearDirsForFinish.add(oldTablePath); + + status = fs.renameDir( + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add( + new DirectoryCleanUpTask(targetPath, true))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); + } + } + updateStatisticsTasks.add( + new UpdateStatisticsTask( + dbName, + tbName, + Optional.empty(), + tableAndMore.getStatisticsUpdate(), + false + )); + } + + public void prepareAddPartition(PartitionAndMore partitionAndMore) { + + HivePartition partition = partitionAndMore.getPartition(); + String targetPath = partition.getPath(); + String writePath = partitionAndMore.getCurrentLocation(); + + if (!targetPath.equals(writePath)) { + fs.asyncRenameDir( + fileSystemExecutor, + asyncFileSystemTaskFutures, + fileSystemTaskCancelled, + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + } + + StorageDescriptor sd = getTable(dbName, tbName).getSd(); + + HivePartition hivePartition = new HivePartition( + dbName, + tbName, + false, + sd.getInputFormat(), + targetPath, + partition.getPartitionValues(), + Maps.newHashMap(), + sd.getOutputFormat(), + sd.getSerdeInfo().getSerializationLib(), + hiveOps.getClient().getSchema(dbName, tbName) + ); + + HivePartitionWithStatistics partitionWithStats = + new HivePartitionWithStatistics( + partitionAndMore.getPartitionName(), + hivePartition, + partitionAndMore.getStatisticsUpdate()); + addPartitionsTask.addPartition(partitionWithStats); + } + + public void prepareInsertExistPartition(PartitionAndMore partitionAndMore) { + + HivePartition partition = partitionAndMore.getPartition(); + String targetPath = partition.getPath(); + String writePath = partitionAndMore.getCurrentLocation(); + directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); + + if (!targetPath.equals(writePath)) { + fs.asyncRename( + fileSystemExecutor, + asyncFileSystemTaskFutures, + fileSystemTaskCancelled, + writePath, + targetPath, + partitionAndMore.getFileNames()); + } + + updateStatisticsTasks.add( + new UpdateStatisticsTask( + dbName, + tbName, + Optional.of(partitionAndMore.getPartitionName()), + partitionAndMore.getStatisticsUpdate(), + true)); + } + + private void runDirectoryClearUpTasksForAbort() { + for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) { + recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir()); + } + } + + private void runRenameDirTasksForAbort() { + Status status; + for (RenameDirectoryTask task : renameDirectoryTasksForAbort) { + status = fs.exists(task.getRenameFrom()); + if (status.ok()) { + status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {}); + if (!status.ok()) { + LOG.warn("Failed to abort rename dir from {} to {}:{}", + task.getRenameFrom(), task.getRenameTo(), status.getErrMsg()); + } + } + } + } + + private void runClearPathsForFinish() { + Status status; + for (String path : clearDirsForFinish) { + status = fs.delete(path); + if (!status.ok()) { + LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode()); + } + } + } + + public void prepareAlterPartition(PartitionAndMore partitionAndMore) { + HivePartition partition = partitionAndMore.getPartition(); + String targetPath = partition.getPath(); + String writePath = partitionAndMore.getCurrentLocation(); + + if (!targetPath.equals(writePath)) { + Path path = new Path(targetPath); + String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); + Status status = fs.renameDir( + targetPath, + oldPartitionPath, + () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir " + + "from " + targetPath + + " to " + oldPartitionPath + ":" + status.getErrMsg()); + } + clearDirsForFinish.add(oldPartitionPath); + + status = fs.renameDir( + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); + } + } + + updateStatisticsTasks.add( + new UpdateStatisticsTask( + dbName, + tbName, + Optional.of(partitionAndMore.getPartitionName()), + partitionAndMore.getStatisticsUpdate(), + false + )); + } + + + private void waitForAsyncFileSystemTasks() { + for (CompletableFuture<?> future : asyncFileSystemTaskFutures) { + MoreFutures.getFutureValue(future, RuntimeException.class); + } + } + + private void doAddPartitionsTask() { + if (!addPartitionsTask.isEmpty()) { + addPartitionsTask.run(hiveOps); + } + } + + private void doUpdateStatisticsTasks() { + ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = ImmutableList.builder(); + List<String> failedTaskDescriptions = new ArrayList<>(); + List<Throwable> suppressedExceptions = new ArrayList<>(); + for (UpdateStatisticsTask task : updateStatisticsTasks) { + updateStatsFutures.add(CompletableFuture.runAsync(() -> { + try { + task.run(hiveOps); + } catch (Throwable t) { + synchronized (suppressedExceptions) { + addSuppressedExceptions( + suppressedExceptions, t, failedTaskDescriptions, task.getDescription()); + } + } + }, updateStatisticsExecutor)); + } + + for (CompletableFuture<?> executeUpdateFuture : updateStatsFutures.build()) { + MoreFutures.getFutureValue(executeUpdateFuture); + } + if (!suppressedExceptions.isEmpty()) { + StringBuilder message = new StringBuilder(); + message.append("Failed to execute some updating statistics tasks: "); + Joiner.on("; ").appendTo(message, failedTaskDescriptions); + RuntimeException exception = new RuntimeException(message.toString()); + suppressedExceptions.forEach(exception::addSuppressed); + throw exception; + } + } + + public void doCommit() { + waitForAsyncFileSystemTasks(); + doAddPartitionsTask(); + doUpdateStatisticsTasks(); + } + + public void rollback() { + cancelUnStartedAsyncFileSystemTask(); + undoUpdateStatisticsTasks(); + undoAddPartitionsTask(); + waitForAsyncFileSystemTaskSuppressThrowable(); + runDirectoryClearUpTasksForAbort(); + runRenameDirTasksForAbort(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index beeff694ae4..f3556d13a57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -36,13 +36,11 @@ import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.operations.ExternalMetadataOps; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; -import org.apache.doris.thrift.THivePartitionUpdate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -73,6 +71,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) { this.catalog = catalog; this.client = client; + // TODO Currently only supports DFSFileSystem, more types will be supported in the future this.fs = new DFSFileSystem(catalog.getProperties()); } @@ -80,6 +79,10 @@ public class HiveMetadataOps implements ExternalMetadataOps { return client; } + public RemoteFileSystem getFs() { + return fs; + } + public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, JdbcClientConfig jdbcClientConfig) { if (hiveConf != null) { @@ -253,23 +256,6 @@ public class HiveMetadataOps implements ExternalMetadataOps { return client.getAllDatabases(); } - public void commit(String dbName, - String tableName, - List<THivePartitionUpdate> hivePUs) { - Table table = client.getTable(dbName, tableName); - HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table); - hmsCommitter.commit(hivePUs); - try { - Env.getCurrentEnv().getCatalogMgr().refreshExternalTable( - dbName, - tableName, - catalog.getName(), - true); - } catch (DdlException e) { - LOG.warn("Failed to refresh table {}.{} : {}", dbName, tableName, e.getMessage()); - } - } - public void updateTableStatistics( String dbName, String tableName, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java index 49b14504750..df13e6737b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Map; public class HivePartitionStatistics { - private static final HivePartitionStatistics EMPTY = + public static final HivePartitionStatistics EMPTY = new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of()); private final HiveCommonStatistics commonStatistics; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java index b7c28b68ff0..e72374aa5f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java @@ -18,9 +18,9 @@ package org.apache.doris.datasource.hive; public class HivePartitionWithStatistics { - private String name; - private HivePartition partition; - private HivePartitionStatistics statistics; + private final String name; + private final HivePartition partition; + private final HivePartitionStatistics statistics; public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) { this.name = name; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 76976165526..66dfe763e46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -17,13 +17,12 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetadataOps; -import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.Plan; @@ -35,14 +34,13 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.transaction.TransactionManager; import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; import java.util.Optional; /** @@ -53,6 +51,8 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { private static final long INVALID_TXN_ID = -1L; private long txnId = INVALID_TXN_ID; private TransactionStatus txnStatus = TransactionStatus.ABORTED; + private final TransactionManager transactionManager; + private final String catalogName; /** * constructor @@ -61,6 +61,8 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) { super(ctx, table, labelName, planner, insertCtx); + catalogName = table.getCatalog().getName(); + transactionManager = table.getCatalog().getTransactionManager(); } public long getTxnId() { @@ -69,7 +71,9 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { @Override public void beginTransaction() { - // TODO: use hive txn rather than internal txn + txnId = transactionManager.begin(); + HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); + coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates); } @Override @@ -93,13 +97,18 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); } else { - // TODO use transaction - List<THivePartitionUpdate> ups = coordinator.getHivePartitionUpdates(); - loadedRows = ups.stream().mapToLong(THivePartitionUpdate::getRowCount).sum(); - ExternalCatalog catalog = ((HMSExternalTable) table).getCatalog(); - ExternalMetadataOps metadataOps = catalog.getMetadataOps(); - ((HiveMetadataOps) metadataOps).commit(((HMSExternalTable) table).getDbName(), table.getName(), ups); + HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); + loadedRows = transaction.getUpdateCnt(); + String dbName = ((HMSExternalTable) table).getDbName(); + String tbName = table.getName(); + transaction.finishInsertTable(dbName, tbName); + transactionManager.commit(txnId); txnStatus = TransactionStatus.COMMITTED; + Env.getCurrentEnv().getCatalogMgr().refreshExternalTable( + dbName, + tbName, + catalogName, + true); } } @@ -117,6 +126,7 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { } } ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); + transactionManager.rollback(txnId); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9ac96c27c87..b00854a84ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -152,6 +152,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.stream.Collectors; public class Coordinator implements CoordInterface { @@ -235,8 +236,8 @@ public class Coordinator implements CoordInterface { private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList(); private final List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList(); - // TODO moved to ExternalTransactionManager - private final List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList(); + // Collect all hivePartitionUpdates obtained from be + Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc; // Input parameter private long jobId = -1; // job which this task belongs to @@ -503,10 +504,6 @@ public class Coordinator implements CoordInterface { return errorTabletInfos; } - public List<THivePartitionUpdate> getHivePartitionUpdates() { - return hivePartitionUpdates; - } - public Map<String, Integer> getBeToInstancesNum() { Map<String, Integer> result = Maps.newTreeMap(); if (enablePipelineEngine) { @@ -2456,13 +2453,8 @@ public class Coordinator implements CoordInterface { // TODO: more ranges? } - private void updateHivePartitionUpdates(List<THivePartitionUpdate> hivePartitionUpdates) { - lock.lock(); - try { - this.hivePartitionUpdates.addAll(hivePartitionUpdates); - } finally { - lock.unlock(); - } + public void setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc) { + this.hivePartitionUpdateFunc = hivePartitionUpdateFunc; } // update job progress from BE @@ -2512,8 +2504,8 @@ public class Coordinator implements CoordInterface { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - if (params.isSetHivePartitionUpdates()) { - updateHivePartitionUpdates(params.getHivePartitionUpdates()); + if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { + hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); } Preconditions.checkArgument(params.isSetDetailedReport()); @@ -2577,8 +2569,8 @@ public class Coordinator implements CoordInterface { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - if (params.isSetHivePartitionUpdates()) { - updateHivePartitionUpdates(params.getHivePartitionUpdates()); + if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { + hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); } if (LOG.isDebugEnabled()) { LOG.debug("Query {} instance {} is marked done", @@ -2649,8 +2641,8 @@ public class Coordinator implements CoordInterface { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - if (params.isSetHivePartitionUpdates()) { - updateHivePartitionUpdates(params.getHivePartitionUpdates()); + if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { + hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); } instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java new file mode 100644 index 00000000000..07304fb23ab --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.hive.HMSTransaction; +import org.apache.doris.datasource.hive.HiveMetadataOps; +import org.apache.doris.persist.EditLog; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HiveTransactionManager implements TransactionManager { + + private final Map<Long, HMSTransaction> transactions = new ConcurrentHashMap<>(); + private final TransactionIdGenerator idGenerator = new TransactionIdGenerator(); + private final HiveMetadataOps ops; + + public HiveTransactionManager(HiveMetadataOps ops) { + this.ops = ops; + } + + public Long getNextTransactionId() { + return idGenerator.getNextTransactionId(); + } + + @Override + public void setEditLog(EditLog editLog) { + this.idGenerator.setEditLog(editLog); + } + + @Override + public long begin() { + long id = idGenerator.getNextTransactionId(); + HMSTransaction hiveTransaction = new HMSTransaction(ops); + transactions.put(id, hiveTransaction); + return id; + } + + @Override + public void commit(long id) throws UserException { + getTransactionWithException(id).commit(); + transactions.remove(id); + } + + @Override + public void rollback(long id) { + getTransactionWithException(id).rollback(); + transactions.remove(id); + } + + @Override + public HMSTransaction getTransaction(long id) { + return getTransactionWithException(id); + } + + public HMSTransaction getTransactionWithException(long id) { + HMSTransaction hiveTransaction = transactions.get(id); + if (hiveTransaction == null) { + throw new RuntimeException("Can't find transaction for " + id); + } + return hiveTransaction; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java similarity index 56% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java copy to fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java index b7c28b68ff0..b319fb78983 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/Transaction.java @@ -15,28 +15,13 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.hive; +package org.apache.doris.transaction; -public class HivePartitionWithStatistics { - private String name; - private HivePartition partition; - private HivePartitionStatistics statistics; +import org.apache.doris.common.UserException; - public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) { - this.name = name; - this.partition = partition; - this.statistics = statistics; - } +public interface Transaction { - public String getName() { - return name; - } + void commit() throws UserException; - public HivePartition getPartition() { - return partition; - } - - public HivePartitionStatistics getStatistics() { - return statistics; - } + void rollback(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java similarity index 56% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java copy to fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java index b7c28b68ff0..daacdecf152 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java @@ -15,28 +15,20 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.hive; - -public class HivePartitionWithStatistics { - private String name; - private HivePartition partition; - private HivePartitionStatistics statistics; - - public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) { - this.name = name; - this.partition = partition; - this.statistics = statistics; - } - - public String getName() { - return name; - } - - public HivePartition getPartition() { - return partition; - } - - public HivePartitionStatistics getStatistics() { - return statistics; - } +package org.apache.doris.transaction; + +import org.apache.doris.common.UserException; +import org.apache.doris.persist.EditLog; + +public interface TransactionManager { + + void setEditLog(EditLog editLog); + + long begin(); + + void commit(long id) throws UserException; + + void rollback(long id); + + Transaction getTransaction(long id); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java similarity index 56% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java copy to fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java index b7c28b68ff0..334258a3f12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java @@ -15,28 +15,13 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.hive; +package org.apache.doris.transaction; -public class HivePartitionWithStatistics { - private String name; - private HivePartition partition; - private HivePartitionStatistics statistics; +import org.apache.doris.datasource.hive.HiveMetadataOps; - public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) { - this.name = name; - this.partition = partition; - this.statistics = statistics; - } - - public String getName() { - return name; - } - - public HivePartition getPartition() { - return partition; - } +public class TransactionManagerFactory { - public HivePartitionStatistics getStatistics() { - return statistics; + public static TransactionManager createHiveTransactionManager(HiveMetadataOps ops) { + return new HiveTransactionManager(ops); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 3098d65e952..fc939625ea9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -17,13 +17,17 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.backup.Status; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.THiveLocationParams; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; import com.google.common.collect.Lists; +import mockit.Mock; +import mockit.MockUp; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.After; @@ -41,6 +45,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; @Ignore public class HmsCommitTest { @@ -61,6 +68,7 @@ public class HmsCommitTest { dbLocation = "file://" + warehousePath.toAbsolutePath() + "/"; createTestHiveCatalog(); createTestHiveDatabase(); + mockFs(); } @AfterClass @@ -90,22 +98,55 @@ public class HmsCommitTest { hmsClient.createDatabase(dbMetadata); } + public static void mockFs() { + + new MockUp<DFSFileSystem>(DFSFileSystem.class) { + @Mock + public void asyncRenameDir(Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + } + + @Mock + public void asyncRename(Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List<String> fileNames) { + } + + @Mock + public Status renameDir(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + return Status.OK; + } + }; + } + @Before public void before() { // create table List<Column> columns = new ArrayList<>(); columns.add(new Column("c1", PrimitiveType.INT, true)); columns.add(new Column("c2", PrimitiveType.STRING, true)); + columns.add(new Column("c3", PrimitiveType.STRING, false)); List<String> partitionKeys = new ArrayList<>(); partitionKeys.add("c3"); HiveTableMetadata tableMetadata = new HiveTableMetadata( dbName, tbWithPartition, columns, partitionKeys, new HashMap<>(), fileFormat); hmsClient.createTable(tableMetadata, true); + HiveTableMetadata tableMetadata2 = new HiveTableMetadata( dbName, tbWithoutPartition, columns, new ArrayList<>(), new HashMap<>(), fileFormat); hmsClient.createTable(tableMetadata2, true); + } @After @@ -118,11 +159,7 @@ public class HmsCommitTest { public void testNewPartitionForUnPartitionedTable() { List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomNew("a")); - try { - hmsOps.commit(dbName, tbWithoutPartition, pus); - } catch (Exception e) { - Assert.assertEquals("Not support mode:[NEW] in unPartitioned table", e.getMessage()); - } + Assert.assertThrows(Exception.class, () -> commit(dbName, tbWithoutPartition, pus)); } @Test @@ -131,7 +168,7 @@ public class HmsCommitTest { pus.add(createRandomAppend("")); pus.add(createRandomAppend("")); pus.add(createRandomAppend("")); - hmsOps.commit(dbName, tbWithoutPartition, pus); + commit(dbName, tbWithoutPartition, pus); Table table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(3, table); @@ -139,7 +176,7 @@ public class HmsCommitTest { pus2.add(createRandomAppend("")); pus2.add(createRandomAppend("")); pus2.add(createRandomAppend("")); - hmsOps.commit(dbName, tbWithoutPartition, pus2); + commit(dbName, tbWithoutPartition, pus2); table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(6, table); } @@ -151,7 +188,7 @@ public class HmsCommitTest { pus.add(createRandomOverwrite("")); pus.add(createRandomOverwrite("")); pus.add(createRandomOverwrite("")); - hmsOps.commit(dbName, tbWithoutPartition, pus); + commit(dbName, tbWithoutPartition, pus); Table table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(3, table); } @@ -165,7 +202,7 @@ public class HmsCommitTest { pus.add(createRandomNew("b")); pus.add(createRandomNew("b")); pus.add(createRandomNew("c")); - hmsOps.commit(dbName, tbWithPartition, pus); + commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); assertNumRows(3, pa); @@ -186,7 +223,7 @@ public class HmsCommitTest { pus.add(createRandomAppend("b")); pus.add(createRandomAppend("b")); pus.add(createRandomAppend("c")); - hmsOps.commit(dbName, tbWithPartition, pus); + commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); assertNumRows(6, pa); @@ -203,7 +240,7 @@ public class HmsCommitTest { pus.add(createRandomOverwrite("a")); pus.add(createRandomOverwrite("b")); pus.add(createRandomOverwrite("c")); - hmsOps.commit(dbName, tbWithPartition, pus); + commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); assertNumRows(1, pa); @@ -221,14 +258,14 @@ public class HmsCommitTest { pus.add(createRandomNew("" + i)); } - hmsOps.commit(dbName, tbWithPartition, pus); + commit(dbName, tbWithPartition, pus); for (int i = 0; i < nums; i++) { Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i)); assertNumRows(1, p); } try { - hmsOps.commit(dbName, tbWithPartition, pus); + commit(dbName, tbWithPartition, pus); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("failed to add partitions")); } @@ -277,4 +314,13 @@ public class HmsCommitTest { public THivePartitionUpdate createRandomOverwrite(String partition) { return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE); } + + public void commit(String dbName, + String tableName, + List<THivePartitionUpdate> hivePUs) { + HMSTransaction hmsTransaction = new HMSTransaction(hmsOps); + hmsTransaction.setHivePartitionUpdates(hivePUs); + hmsTransaction.finishInsertTable(dbName, tableName); + hmsTransaction.commit(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org