This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 86f6bd4 HIVE-24445: Non blocking DROP table implementation (Denys Kuzmenko, reviewed by Karen Coppage, Peter Vary) 86f6bd4 is described below commit 86f6bd46dcf4f84c6a393575d37884b4548c38f5 Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Wed Jan 12 11:47:41 2022 +0200 HIVE-24445: Non blocking DROP table implementation (Denys Kuzmenko, reviewed by Karen Coppage, Peter Vary) Closes #2772 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../hive/ql/ddl/table/drop/DropTableAnalyzer.java | 13 +- .../hive/ql/ddl/table/drop/DropTableOperation.java | 2 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 +- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 37 +++++- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 66 +++++++--- .../hive/ql/txn/compactor/CompactorThread.java | 2 +- .../org/apache/hadoop/hive/ql/TestTxnCommands.java | 88 ++++++++++++++ .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 1 + .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 133 ++++++++++++++++++++- .../hive/ql/metadata/TestHiveMetaStoreChecker.java | 3 +- .../hive/ql/txn/compactor/TestInitiator.java | 2 +- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 ++++ .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 +- .../apache/hadoop/hive/metastore/api/Table.java | 106 +++++++++++++++- .../src/gen/thrift/gen-php/metastore/Table.php | 24 ++++ .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 ++- .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +- .../hadoop/hive/metastore/HiveMetaStoreClient.java | 17 ++- .../hadoop/hive/metastore/IMetaStoreClient.java | 12 +- .../apache/hadoop/hive/metastore/Warehouse.java | 13 +- .../src/main/thrift/hive_metastore.thrift | 5 +- .../hadoop/hive/metastore/AcidEventListener.java | 28 ++++- .../apache/hadoop/hive/metastore/HMSHandler.java | 14 ++- .../hive/metastore/txn/CompactionTxnHandler.java | 12 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 75 ++++++------ .../apache/hadoop/hive/metastore/txn/TxnStore.java | 12 +- .../apache/hadoop/hive/metastore/txn/TxnUtils.java | 8 +- .../metastore/HiveMetaStoreClientPreCatalog.java | 5 + .../hive/metastore/txn/ThrowingTxnHandler.java | 4 +- .../apache/hadoop/hive/common/AcidConstants.java | 4 + 31 files changed, 642 insertions(+), 108 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c384583..a2a498b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -77,7 +77,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; - /** * Hive Configuration. */ @@ -3066,6 +3065,10 @@ public class HiveConf extends Configuration { HIVE_ACID_LOCKLESS_READS_ENABLED("hive.acid.lockless.reads.enabled", false, "Enables lockless reads"), + + HIVE_ACID_CREATE_TABLE_USE_SUFFIX("hive.acid.createtable.softdelete", false, + "Enables non-blocking DROP TABLE operation.\n" + + "If enabled, every table directory would be suffixed with the corresponding table creation txnId."), HIVE_ACID_TRUNCATE_USE_BASE("hive.acid.truncate.usebase", false, "If enabled, truncate for transactional tables will not delete the data directories,\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java index 9ad3b6d..1507ca7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hive.ql.ddl.DDLSemanticAnalyzerFactory.DDLType; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; @@ -33,6 +35,8 @@ import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; + /** * Analyzer for table dropping commands. */ @@ -51,7 +55,14 @@ public class DropTableAnalyzer extends BaseSemanticAnalyzer { Table table = getTable(tableName, throwException); if (table != null) { inputs.add(new ReadEntity(table)); - outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_EXCLUSIVE)); + + boolean tableWithSuffix = (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX) + || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED)) + && AcidUtils.isTransactionalTable(table) + && Boolean.parseBoolean(table.getProperty(SOFT_DELETE_TABLE)); + + outputs.add(new WriteEntity(table, + tableWithSuffix ? WriteType.DDL_EXCL_WRITE : WriteType.DDL_EXCLUSIVE)); } boolean purge = (root.getFirstChildWithType(HiveParser.KW_PURGE) != null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java index b365965..a5fbcd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java @@ -109,7 +109,7 @@ public class DropTableOperation extends DDLOperation<DropTableDesc> { } // TODO: API w/catalog name - context.getDb().dropTable(desc.getTableName(), desc.isPurge()); + context.getDb().dropTable(table, desc.isPurge()); DDLUtils.addIfAbsentByName(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK), context); if (LlapHiveUtils.isLlapMode(context.getConf())) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 519c8f0..71931d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -3110,6 +3110,11 @@ public class AcidUtils { return TxnType.COMPACTION; } // check if soft delete + if (tree.getToken().getType() == HiveParser.TOK_DROPTABLE + && (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX) + || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))){ + return TxnType.SOFT_DELETE; + } if (tree.getToken().getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS && (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE) || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))) { @@ -3118,7 +3123,7 @@ public class AcidUtils { return TxnType.DEFAULT; } - public static boolean isReadOnlyTxn(ASTNode tree) { + private static boolean isReadOnlyTxn(ASTNode tree) { final ASTSearcher astSearcher = new ASTSearcher(); return READ_TXN_TOKENS.contains(tree.getToken().getType()) || (tree.getToken().getType() == HiveParser.TOK_QUERY && Stream.of( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e3b0997..80541fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -28,6 +28,8 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; + import static org.apache.hadoop.hive.conf.Constants.MATERIALIZED_VIEW_REWRITING_TIME_WINDOW; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WRITE_NOTIFICATION_MAX_BATCH_SIZE; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; @@ -104,7 +106,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest; import org.apache.hadoop.hive.metastore.api.GetTableRequest; -import org.apache.hadoop.hive.metastore.api.SourceTable; import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -128,7 +129,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CompactionResponse; import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.CreationMetadata; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DataConnector; import org.apache.hadoop.hive.metastore.api.DefaultConstraintsRequest; @@ -1273,6 +1273,15 @@ public class Hive { principalPrivs.setRolePrivileges(grants.getRoleGrants()); tTbl.setPrivileges(principalPrivs); } + if (AcidUtils.isTransactionalTable(tbl)) { + boolean createTableUseSuffix = HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX) + || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED); + + if (createTableUseSuffix) { + tbl.setProperty(SOFT_DELETE_TABLE, Boolean.TRUE.toString()); + } + tTbl.setTxnId(ss.getTxnMgr().getCurrentTxnId()); + } } // Set table snapshot to api.Table to make it persistent. A transactional table being // replicated may have a valid write Id copied from the source. Use that instead of @@ -1280,7 +1289,7 @@ public class Hive { if (tTbl.getWriteId() <= 0) { TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true); if (tableSnapshot != null) { - tbl.getTTable().setWriteId(tableSnapshot.getWriteId()); + tTbl.setWriteId(tableSnapshot.getWriteId()); } } @@ -1334,6 +1343,19 @@ public class Hive { dropTable(names[0], names[1], true, true, ifPurge); } + public void dropTable(Table table, boolean ifPurge) throws HiveException { + boolean tableWithSuffix = (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX) + || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED)) + && AcidUtils.isTransactionalTable(table) + && Boolean.parseBoolean(table.getProperty(SOFT_DELETE_TABLE)); + + long txnId = Optional.ofNullable(SessionState.get()) + .map(ss -> ss.getTxnMgr().getCurrentTxnId()).orElse(0L); + table.getTTable().setTxnId(txnId); + + dropTable(table.getTTable(), !tableWithSuffix, true, ifPurge); + } + /** * Drops table along with the data in it. If the table doesn't exist then it * is a no-op @@ -1404,7 +1426,14 @@ public class Hive { } } - + public void dropTable(org.apache.hadoop.hive.metastore.api.Table table, + boolean deleteData, boolean ignoreUnknownTab, boolean ifPurge) throws HiveException { + try { + getMSC().dropTable(table, deleteData, ignoreUnknownTab, ifPurge); + } catch (Exception e) { + throw new HiveException(e); + } + } /** * Truncates the table/partition as per specifications. Just trash the data files diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 221e99c..bca9a29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.metastore.metrics.PerfLogger; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable; @@ -39,7 +41,22 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -180,14 +197,15 @@ public class Cleaner extends MetaStoreCompactorThread { if (metricsEnabled) { perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric); } - String location = Optional.ofNullable(ci.properties).map(StringableMap::new) - .map(config -> config.get("location")).orElse(null); + Optional<String> location = Optional.ofNullable(ci.properties).map(StringableMap::new) + .map(config -> config.get("location")); Callable<Boolean> cleanUpTask; - Table t = resolveTable(ci); + Table t = null; Partition p = resolvePartition(ci); - if (location == null) { + if (!location.isPresent()) { + t = resolveTable(ci); if (t == null) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + @@ -218,11 +236,15 @@ public class Cleaner extends MetaStoreCompactorThread { } } txnHandler.markCleanerStart(ci); - - StorageDescriptor sd = resolveStorageDescriptor(t, p); - cleanUpTask = () -> removeFiles(Optional.ofNullable(location).orElse(sd.getLocation()), - minOpenTxnGLB, ci, ci.partName != null && p == null); + if (t != null) { + StorageDescriptor sd = resolveStorageDescriptor(t, p); + cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()), minOpenTxnGLB, ci, + ci.partName != null && p == null); + } else { + cleanUpTask = () -> removeFiles(location.get(), ci); + } + Ref<Boolean> removedFiles = Ref.from(false); if (runJobAsSelf(ci.runAs)) { removedFiles.value = cleanUpTask.call(); @@ -301,15 +323,9 @@ public class Cleaner extends MetaStoreCompactorThread { try { res = txnHandler.lock(lockRequest); if (res.getState() == LockState.ACQUIRED) { + //check if partition wasn't recreated if (resolvePartition(ci) == null) { - Path path = new Path(location); - StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(","); - - boolean ifPurge = Optional.ofNullable(ci.properties).map(StringableMap::new) - .map(config -> config.get("ifPurge")).map(Boolean::valueOf).orElse(true); - - return remove(location, ci, Collections.singletonList(path), ifPurge, - path.getFileSystem(conf), extraDebugInfo); + return removeFiles(location, ci); } } } catch (NoSuchTxnException | TxnAbortedException e) { @@ -399,6 +415,18 @@ public class Cleaner extends MetaStoreCompactorThread { return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); } + private boolean removeFiles(String location, CompactionInfo ci) + throws NoSuchObjectException, IOException, MetaException { + Path path = new Path(location); + StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(","); + + boolean ifPurge = Optional.ofNullable(ci.properties).map(StringableMap::new) + .map(config -> config.get("ifPurge")).map(Boolean::valueOf).orElse(true); + + return remove(location, ci, Collections.singletonList(path), ifPurge, + path.getFileSystem(conf), extraDebugInfo); + } + private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge, FileSystem fs, StringBuilder extraDebugInfo) throws NoSuchObjectException, MetaException, IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 9b5affa..473c186 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -146,7 +146,7 @@ public abstract class CompactorThread extends Thread implements Configurable { protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) { return (p == null) ? t.getSd() : p.getSd(); } - + /** * Determine whether to run this job as the current user or whether we need a doAs to switch * users. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index ddc7530..1279f15 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -81,6 +82,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN; + /** * The LockManager is not ready, but for no-concurrency straight-line path we can * test AC=true, and AC=false with commit/rollback/exception and test resulting data. @@ -108,6 +111,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { //of these tests. HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, false); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, false); HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false); } @@ -1680,4 +1684,88 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { } } } + + @Test + public void testDropTableWithSuffix() throws Exception { + String tableName = "tab_acid"; + runStatementOnDriver("drop table if exists " + tableName); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true); + + runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)"); + runStatementOnDriver("drop table " + tableName); + + int count = TestTxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); + Assert.assertEquals(1, count); + + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), + t -> t.getName().matches(tableName + SOFT_DELETE_TABLE_PATTERN)); + if (1 != stat.length) { + Assert.fail("Table data was removed from FS"); + } + MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService(); + houseKeeperService.setConf(hiveConf); + + houseKeeperService.run(); + count = TestTxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); + Assert.assertEquals(0, count); + + try { + runStatementOnDriver("select * from " + tableName); + } catch (Exception ex) { + Assert.assertTrue(ex.getMessage().contains( + ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(tableName, "'")))); + } + // Check status of compaction job + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", + TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + + runCleaner(hiveConf); + + FileStatus[] status = fs.listStatus(new Path(getWarehouseDir()), + t -> t.getName().matches(tableName + SOFT_DELETE_TABLE_PATTERN)); + Assert.assertEquals(0, status.length); + } + + @Test + public void testDropTableWithoutSuffix() throws Exception { + String tableName = "tab_acid"; + runStatementOnDriver("drop table if exists " + tableName); + + for (boolean enabled : Arrays.asList(false, true)) { + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled); + runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)"); + + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, !enabled); + runStatementOnDriver("drop table " + tableName); + + int count = TestTxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); + Assert.assertEquals(0, count); + + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), + t -> t.getName().equals(tableName)); + Assert.assertEquals(0, stat.length); + + try { + runStatementOnDriver("select * from " + tableName); + } catch (Exception ex) { + Assert.assertTrue(ex.getMessage().contains( + ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(tableName, "'")))); + } + // Check status of compaction job + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 0, resp.getCompactsSize()); + } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index 9eb9e1b..a78ae89 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -61,6 +61,7 @@ public abstract class DbTxnManagerEndToEndTestBase { MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, getWarehouseDir()); TestTxnDbUtil.setConfValues(conf); } + @BeforeClass public static void setUpDB() throws Exception{ TestTxnDbUtil.prepDb(conf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index c3db57d..b78a93f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -61,6 +61,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN; + import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker; import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runCleaner; @@ -3380,7 +3382,6 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ dropTable(new String[] {"tab_acid"}); FileSystem fs = FileSystem.get(conf); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, !blocking); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, 1); driver = Mockito.spy(driver); @@ -3461,4 +3462,134 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ Assert.assertEquals("Expecting 1 rows and found " + res.size(), 1, res.size()); } + @Test + public void testDropTableNonBlocking() throws Exception { + testDropTable(false); + } + @Test + public void testDropTableBlocking() throws Exception { + testDropTable(true); + } + + private void testDropTable(boolean blocking) throws Exception { + dropTable(new String[] {"tab_acid"}); + FileSystem fs = FileSystem.get(conf); + + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, !blocking); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, 1); + driver = Mockito.spy(driver); + + HiveConf.setBoolVar(driver2.getConf(), HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, !blocking); + driver2 = Mockito.spy(driver2); + + driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " + + "stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')"); + + driver.compileAndRespond("select * from tab_acid"); + List<String> res = new ArrayList<>(); + + driver.lockAndRespond(); + List<ShowLocksResponseElement> locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + + checkLock(LockType.SHARED_READ, + LockState.ACQUIRED, "default", "tab_acid", null, locks); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("drop table if exists tab_acid"); + + if (blocking) { + txnMgr2.acquireLocks(driver2.getPlan(), ctx, null, false); + locks = getLocks(); + + ShowLocksResponseElement checkLock = checkLock(LockType.EXCLUSIVE, + LockState.WAITING, "default", "tab_acid", null, locks); + + swapTxnManager(txnMgr); + Mockito.doNothing().when(driver).lockAndRespond(); + driver.run(); + + driver.getFetchTask().fetch(res); + swapTxnManager(txnMgr2); + + FieldSetter.setField(txnMgr2, txnMgr2.getClass().getDeclaredField("numStatements"), 0); + txnMgr2.getMS().unlock(checkLock.getLockid()); + } + driver2.lockAndRespond(); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size()); + + checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE, + LockState.ACQUIRED, "default", "tab_acid", null, locks); + + Mockito.doNothing().when(driver2).lockAndRespond(); + driver2.run(); + + if (!blocking) { + swapTxnManager(txnMgr); + Mockito.doNothing().when(driver).lockAndRespond(); + driver.run(); + } + Mockito.reset(driver, driver2); + + FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), + t -> t.getName().matches("tab_acid" + (blocking ? "" : SOFT_DELETE_TABLE_PATTERN))); + if ((blocking ? 0 : 1) != stat.length) { + Assert.fail("Table data was " + (blocking ? "not" : "") + "removed from FS"); + } + driver.getFetchTask().fetch(res); + Assert.assertEquals("Expecting 2 rows and found " + res.size(), 2, res.size()); + + try { + driver.run("select * from tab_acid"); + } catch (CommandProcessorException ex) { + Assert.assertEquals(ErrorMsg.INVALID_TABLE.getErrorCode(), ex.getResponseCode()); + } + + //re-create table with the same name + driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " + + "stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')"); + + driver.run("select * from tab_acid "); + res = new ArrayList<>(); + driver.getFetchTask().fetch(res); + Assert.assertEquals("Expecting 2 rows and found " + res.size(), 2, res.size()); + } + + @Test + public void testDropTableNonBlocking2() throws Exception { + dropTable(new String[] {"tab_acid"}); + + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true); + HiveConf.setBoolVar(driver2.getConf(), HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true); + + driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " + + "stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')"); + + driver.compileAndRespond("select * from tab_acid"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + // when running this, valid writeid list is not yet fetched by the `select` operation, + // so we should keep TXN_TO_WRITE_ID entries until the Cleaner runs. + driver2.run("drop table if exists tab_acid"); + + swapTxnManager(txnMgr); + driver.run(); + + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()), + t -> t.getName().matches("tab_acid" + SOFT_DELETE_TABLE_PATTERN)); + if (1 != stat.length) { + Assert.fail("Table data was removed from FS"); + } + + List<String> res = new ArrayList<>(); + driver.getFetchTask().fetch(res); + Assert.assertEquals("No records found", 2, res.size()); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java index 4ad4a74..af29201 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java @@ -88,7 +88,8 @@ public class TestHiveMetaStoreChecker { hive.getConf().setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); HiveConf.setBoolVar(hive.getConf(), HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - SessionState.start(hive.getConf()); + SessionState ss = SessionState.start(hive.getConf()); + ss.initTxnMgr(hive.getConf()); partCols = new ArrayList<>(); partCols.add(new FieldSchema(partDateName, serdeConstants.STRING_TYPE_NAME, "")); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 33871fa..e3ff82e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -900,7 +900,7 @@ public class TestInitiator extends CompactorTest { } /** - * Tests org.apache.hadoop.hive.metastore.txn.#findUserToRunAs(java.lang.String, org.apache.hadoop + * Tests org.apache.hadoop.hive.metastore.txn.TxnUtils#findUserToRunAs(java.lang.String, org.apache.hadoop * .hive.metastore.api.Table). * Used by Worker and Initiator. * Initiator caches this via Initiator#resolveUserToRunAs. diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 3d13a9e..85ad737 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -11408,6 +11408,11 @@ void Table::__set_dictionary(const ObjectDictionary& val) { this->dictionary = val; __isset.dictionary = true; } + +void Table::__set_txnId(const int64_t val) { + this->txnId = val; +__isset.txnId = true; +} std::ostream& operator<<(std::ostream& out, const Table& obj) { obj.printTo(out); @@ -11705,6 +11710,14 @@ uint32_t Table::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 28: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->txnId); + this->__isset.txnId = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -11878,6 +11891,11 @@ uint32_t Table::write(::apache::thrift::protocol::TProtocol* oprot) const { xfer += this->dictionary.write(oprot); xfer += oprot->writeFieldEnd(); } + if (this->__isset.txnId) { + xfer += oprot->writeFieldBegin("txnId", ::apache::thrift::protocol::T_I64, 28); + xfer += oprot->writeI64(this->txnId); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -11912,6 +11930,7 @@ void swap(Table &a, Table &b) { swap(a.id, b.id); swap(a.fileMetadata, b.fileMetadata); swap(a.dictionary, b.dictionary); + swap(a.txnId, b.txnId); swap(a.__isset, b.__isset); } @@ -11943,6 +11962,7 @@ Table::Table(const Table& other400) { id = other400.id; fileMetadata = other400.fileMetadata; dictionary = other400.dictionary; + txnId = other400.txnId; __isset = other400.__isset; } Table& Table::operator=(const Table& other401) { @@ -11973,6 +11993,7 @@ Table& Table::operator=(const Table& other401) { id = other401.id; fileMetadata = other401.fileMetadata; dictionary = other401.dictionary; + txnId = other401.txnId; __isset = other401.__isset; return *this; } @@ -12006,6 +12027,7 @@ void Table::printTo(std::ostream& out) const { out << ", " << "id="; (__isset.id ? (out << to_string(id)) : (out << "<null>")); out << ", " << "fileMetadata="; (__isset.fileMetadata ? (out << to_string(fileMetadata)) : (out << "<null>")); out << ", " << "dictionary="; (__isset.dictionary ? (out << to_string(dictionary)) : (out << "<null>")); + out << ", " << "txnId="; (__isset.txnId ? (out << to_string(txnId)) : (out << "<null>")); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index 6f3d42d..d383d56 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -4782,7 +4782,7 @@ void swap(ObjectDictionary &a, ObjectDictionary &b); std::ostream& operator<<(std::ostream& out, const ObjectDictionary& obj); typedef struct _Table__isset { - _Table__isset() : tableName(false), dbName(false), owner(false), createTime(false), lastAccessTime(false), retention(false), sd(false), partitionKeys(false), parameters(false), viewOriginalText(false), viewExpandedText(false), tableType(false), privileges(false), temporary(true), rewriteEnabled(false), creationMetadata(false), catName(false), ownerType(true), writeId(true), isStatsCompliant(false), colStats(false), accessType(false), requiredReadCapabilities(false), requiredWriteCapabi [...] + _Table__isset() : tableName(false), dbName(false), owner(false), createTime(false), lastAccessTime(false), retention(false), sd(false), partitionKeys(false), parameters(false), viewOriginalText(false), viewExpandedText(false), tableType(false), privileges(false), temporary(true), rewriteEnabled(false), creationMetadata(false), catName(false), ownerType(true), writeId(true), isStatsCompliant(false), colStats(false), accessType(false), requiredReadCapabilities(false), requiredWriteCapabi [...] bool tableName :1; bool dbName :1; bool owner :1; @@ -4810,6 +4810,7 @@ typedef struct _Table__isset { bool id :1; bool fileMetadata :1; bool dictionary :1; + bool txnId :1; } _Table__isset; class Table : public virtual ::apache::thrift::TBase { @@ -4817,7 +4818,7 @@ class Table : public virtual ::apache::thrift::TBase { Table(const Table&); Table& operator=(const Table&); - Table() : tableName(), dbName(), owner(), createTime(0), lastAccessTime(0), retention(0), viewOriginalText(), viewExpandedText(), tableType(), temporary(false), rewriteEnabled(0), catName(), ownerType((PrincipalType::type)1), writeId(-1LL), isStatsCompliant(0), accessType(0), id(0) { + Table() : tableName(), dbName(), owner(), createTime(0), lastAccessTime(0), retention(0), viewOriginalText(), viewExpandedText(), tableType(), temporary(false), rewriteEnabled(0), catName(), ownerType((PrincipalType::type)1), writeId(-1LL), isStatsCompliant(0), accessType(0), id(0), txnId(0) { ownerType = (PrincipalType::type)1; } @@ -4854,6 +4855,7 @@ class Table : public virtual ::apache::thrift::TBase { int64_t id; FileMetadata fileMetadata; ObjectDictionary dictionary; + int64_t txnId; _Table__isset __isset; @@ -4911,6 +4913,8 @@ class Table : public virtual ::apache::thrift::TBase { void __set_dictionary(const ObjectDictionary& val); + void __set_txnId(const int64_t val); + bool operator == (const Table & rhs) const { if (!(tableName == rhs.tableName)) @@ -4997,6 +5001,10 @@ class Table : public virtual ::apache::thrift::TBase { return false; else if (__isset.dictionary && !(dictionary == rhs.dictionary)) return false; + if (__isset.txnId != rhs.__isset.txnId) + return false; + else if (__isset.txnId && !(txnId == rhs.txnId)) + return false; return true; } bool operator != (const Table &rhs) const { diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Table.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Table.java index 2296441..c91f557 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Table.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Table.java @@ -38,6 +38,7 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField ID_FIELD_DESC = new org.apache.thrift.protocol.TField("id", org.apache.thrift.protocol.TType.I64, (short)25); private static final org.apache.thrift.protocol.TField FILE_METADATA_FIELD_DESC = new org.apache.thrift.protocol.TField("fileMetadata", org.apache.thrift.protocol.TType.STRUCT, (short)26); private static final org.apache.thrift.protocol.TField DICTIONARY_FIELD_DESC = new org.apache.thrift.protocol.TField("dictionary", org.apache.thrift.protocol.TType.STRUCT, (short)27); + private static final org.apache.thrift.protocol.TField TXN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("txnId", org.apache.thrift.protocol.TType.I64, (short)28); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TableStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TableTupleSchemeFactory(); @@ -69,6 +70,7 @@ package org.apache.hadoop.hive.metastore.api; private long id; // optional private @org.apache.thrift.annotation.Nullable FileMetadata fileMetadata; // optional private @org.apache.thrift.annotation.Nullable ObjectDictionary dictionary; // optional + private long txnId; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -102,7 +104,8 @@ package org.apache.hadoop.hive.metastore.api; REQUIRED_WRITE_CAPABILITIES((short)24, "requiredWriteCapabilities"), ID((short)25, "id"), FILE_METADATA((short)26, "fileMetadata"), - DICTIONARY((short)27, "dictionary"); + DICTIONARY((short)27, "dictionary"), + TXN_ID((short)28, "txnId"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -172,6 +175,8 @@ package org.apache.hadoop.hive.metastore.api; return FILE_METADATA; case 27: // DICTIONARY return DICTIONARY; + case 28: // TXN_ID + return TXN_ID; default: return null; } @@ -222,8 +227,9 @@ package org.apache.hadoop.hive.metastore.api; private static final int __ISSTATSCOMPLIANT_ISSET_ID = 6; private static final int __ACCESSTYPE_ISSET_ID = 7; private static final int __ID_ISSET_ID = 8; + private static final int __TXNID_ISSET_ID = 9; private short __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.PRIVILEGES,_Fields.TEMPORARY,_Fields.REWRITE_ENABLED,_Fields.CREATION_METADATA,_Fields.CAT_NAME,_Fields.OWNER_TYPE,_Fields.WRITE_ID,_Fields.IS_STATS_COMPLIANT,_Fields.COL_STATS,_Fields.ACCESS_TYPE,_Fields.REQUIRED_READ_CAPABILITIES,_Fields.REQUIRED_WRITE_CAPABILITIES,_Fields.ID,_Fields.FILE_METADATA,_Fields.DICTIONARY}; + private static final _Fields optionals[] = {_Fields.PRIVILEGES,_Fields.TEMPORARY,_Fields.REWRITE_ENABLED,_Fields.CREATION_METADATA,_Fields.CAT_NAME,_Fields.OWNER_TYPE,_Fields.WRITE_ID,_Fields.IS_STATS_COMPLIANT,_Fields.COL_STATS,_Fields.ACCESS_TYPE,_Fields.REQUIRED_READ_CAPABILITIES,_Fields.REQUIRED_WRITE_CAPABILITIES,_Fields.ID,_Fields.FILE_METADATA,_Fields.DICTIONARY,_Fields.TXN_ID}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -286,6 +292,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, FileMetadata.class))); tmpMap.put(_Fields.DICTIONARY, new org.apache.thrift.meta_data.FieldMetaData("dictionary", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ObjectDictionary.class))); + tmpMap.put(_Fields.TXN_ID, new org.apache.thrift.meta_data.FieldMetaData("txnId", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Table.class, metaDataMap); } @@ -406,6 +414,7 @@ package org.apache.hadoop.hive.metastore.api; if (other.isSetDictionary()) { this.dictionary = new ObjectDictionary(other.dictionary); } + this.txnId = other.txnId; } public Table deepCopy() { @@ -451,6 +460,8 @@ package org.apache.hadoop.hive.metastore.api; this.id = 0; this.fileMetadata = null; this.dictionary = null; + setTxnIdIsSet(false); + this.txnId = 0; } @org.apache.thrift.annotation.Nullable @@ -1150,6 +1161,28 @@ package org.apache.hadoop.hive.metastore.api; } } + public long getTxnId() { + return this.txnId; + } + + public void setTxnId(long txnId) { + this.txnId = txnId; + setTxnIdIsSet(true); + } + + public void unsetTxnId() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TXNID_ISSET_ID); + } + + /** Returns true if field txnId is set (has been assigned a value) and false otherwise */ + public boolean isSetTxnId() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TXNID_ISSET_ID); + } + + public void setTxnIdIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TXNID_ISSET_ID, value); + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case TABLE_NAME: @@ -1368,6 +1401,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case TXN_ID: + if (value == null) { + unsetTxnId(); + } else { + setTxnId((java.lang.Long)value); + } + break; + } } @@ -1455,6 +1496,9 @@ package org.apache.hadoop.hive.metastore.api; case DICTIONARY: return getDictionary(); + case TXN_ID: + return getTxnId(); + } throw new java.lang.IllegalStateException(); } @@ -1520,6 +1564,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetFileMetadata(); case DICTIONARY: return isSetDictionary(); + case TXN_ID: + return isSetTxnId(); } throw new java.lang.IllegalStateException(); } @@ -1780,6 +1826,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_txnId = true && this.isSetTxnId(); + boolean that_present_txnId = true && that.isSetTxnId(); + if (this_present_txnId || that_present_txnId) { + if (!(this_present_txnId && that_present_txnId)) + return false; + if (this.txnId != that.txnId) + return false; + } + return true; } @@ -1889,6 +1944,10 @@ package org.apache.hadoop.hive.metastore.api; if (isSetDictionary()) hashCode = hashCode * 8191 + dictionary.hashCode(); + hashCode = hashCode * 8191 + ((isSetTxnId()) ? 131071 : 524287); + if (isSetTxnId()) + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(txnId); + return hashCode; } @@ -2170,6 +2229,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetTxnId(), other.isSetTxnId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTxnId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.txnId, other.txnId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -2400,6 +2469,12 @@ package org.apache.hadoop.hive.metastore.api; } first = false; } + if (isSetTxnId()) { + if (!first) sb.append(", "); + sb.append("txnId:"); + sb.append(this.txnId); + first = false; + } sb.append(")"); return sb.toString(); } @@ -2728,6 +2803,14 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 28: // TXN_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.txnId = iprot.readI64(); + struct.setTxnIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -2917,6 +3000,11 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeFieldEnd(); } } + if (struct.isSetTxnId()) { + oprot.writeFieldBegin(TXN_ID_FIELD_DESC); + oprot.writeI64(struct.txnId); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -3016,7 +3104,10 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetDictionary()) { optionals.set(26); } - oprot.writeBitSet(optionals, 27); + if (struct.isSetTxnId()) { + optionals.set(27); + } + oprot.writeBitSet(optionals, 28); if (struct.isSetTableName()) { oprot.writeString(struct.tableName); } @@ -3123,12 +3214,15 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetDictionary()) { struct.dictionary.write(oprot); } + if (struct.isSetTxnId()) { + oprot.writeI64(struct.txnId); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, Table struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(27); + java.util.BitSet incoming = iprot.readBitSet(28); if (incoming.get(0)) { struct.tableName = iprot.readString(); struct.setTableNameIsSet(true); @@ -3282,6 +3376,10 @@ package org.apache.hadoop.hive.metastore.api; struct.dictionary.read(iprot); struct.setDictionaryIsSet(true); } + if (incoming.get(27)) { + struct.txnId = iprot.readI64(); + struct.setTxnIdIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Table.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Table.php index c066623..177e79e 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Table.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Table.php @@ -184,6 +184,11 @@ class Table 'type' => TType::STRUCT, 'class' => '\metastore\ObjectDictionary', ), + 28 => array( + 'var' => 'txnId', + 'isRequired' => false, + 'type' => TType::I64, + ), ); /** @@ -294,6 +299,10 @@ class Table * @var \metastore\ObjectDictionary */ public $dictionary = null; + /** + * @var int + */ + public $txnId = null; public function __construct($vals = null) { @@ -379,6 +388,9 @@ class Table if (isset($vals['dictionary'])) { $this->dictionary = $vals['dictionary']; } + if (isset($vals['txnId'])) { + $this->txnId = $vals['txnId']; + } } } @@ -636,6 +648,13 @@ class Table $xfer += $input->skip($ftype); } break; + case 28: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->txnId); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -832,6 +851,11 @@ class Table $xfer += $this->dictionary->write($output); $xfer += $output->writeFieldEnd(); } + if ($this->txnId !== null) { + $xfer += $output->writeFieldBegin('txnId', TType::I64, 28); + $xfer += $output->writeI64($this->txnId); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 5c15eb1..298b6ed 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -6525,11 +6525,12 @@ class Table(object): - id - fileMetadata - dictionary + - txnId """ - def __init__(self, tableName=None, dbName=None, owner=None, createTime=None, lastAccessTime=None, retention=None, sd=None, partitionKeys=None, parameters=None, viewOriginalText=None, viewExpandedText=None, tableType=None, privileges=None, temporary=False, rewriteEnabled=None, creationMetadata=None, catName=None, ownerType=1, writeId=-1, isStatsCompliant=None, colStats=None, accessType=None, requiredReadCapabilities=None, requiredWriteCapabilities=None, id=None, fileMetadata=None, dic [...] + def __init__(self, tableName=None, dbName=None, owner=None, createTime=None, lastAccessTime=None, retention=None, sd=None, partitionKeys=None, parameters=None, viewOriginalText=None, viewExpandedText=None, tableType=None, privileges=None, temporary=False, rewriteEnabled=None, creationMetadata=None, catName=None, ownerType=1, writeId=-1, isStatsCompliant=None, colStats=None, accessType=None, requiredReadCapabilities=None, requiredWriteCapabilities=None, id=None, fileMetadata=None, dic [...] self.tableName = tableName self.dbName = dbName self.owner = owner @@ -6557,6 +6558,7 @@ class Table(object): self.id = id self.fileMetadata = fileMetadata self.dictionary = dictionary + self.txnId = txnId def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -6730,6 +6732,11 @@ class Table(object): self.dictionary.read(iprot) else: iprot.skip(ftype) + elif fid == 28: + if ftype == TType.I64: + self.txnId = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -6861,6 +6868,10 @@ class Table(object): oprot.writeFieldBegin('dictionary', TType.STRUCT, 27) self.dictionary.write(oprot) oprot.writeFieldEnd() + if self.txnId is not None: + oprot.writeFieldBegin('txnId', TType.I64, 28) + oprot.writeI64(self.txnId) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -29727,6 +29738,7 @@ Table.thrift_spec = ( (25, TType.I64, 'id', None, None, ), # 25 (26, TType.STRUCT, 'fileMetadata', [FileMetadata, None], None, ), # 26 (27, TType.STRUCT, 'dictionary', [ObjectDictionary, None], None, ), # 27 + (28, TType.I64, 'txnId', None, None, ), # 28 ) all_structs.append(Partition) Partition.thrift_spec = ( diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 195de51..5c78f49 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2342,6 +2342,7 @@ class Table ID = 25 FILEMETADATA = 26 DICTIONARY = 27 + TXNID = 28 FIELDS = { TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'}, @@ -2370,7 +2371,8 @@ class Table REQUIREDWRITECAPABILITIES => {:type => ::Thrift::Types::LIST, :name => 'requiredWriteCapabilities', :element => {:type => ::Thrift::Types::STRING}, :optional => true}, ID => {:type => ::Thrift::Types::I64, :name => 'id', :optional => true}, FILEMETADATA => {:type => ::Thrift::Types::STRUCT, :name => 'fileMetadata', :class => ::FileMetadata, :optional => true}, - DICTIONARY => {:type => ::Thrift::Types::STRUCT, :name => 'dictionary', :class => ::ObjectDictionary, :optional => true} + DICTIONARY => {:type => ::Thrift::Types::STRUCT, :name => 'dictionary', :class => ::ObjectDictionary, :optional => true}, + TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 8e373e4..9fc0987 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -1793,6 +1793,22 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override + public void dropTable(Table tbl, boolean deleteData, boolean ignoreUnknownTbl, boolean ifPurge) throws TException { + EnvironmentContext context = null; + if (ifPurge) { + context = getEnvironmentContextWithIfPurgeSet(); + } + if (tbl.isSetTxnId()) { + context = Optional.ofNullable(context).orElse(new EnvironmentContext()); + context.putToProperties("txnId", String.valueOf(tbl.getTxnId())); + } + String catName = Optional.ofNullable(tbl.getCatName()).orElse(getDefaultCatalog(conf)); + + dropTable(catName, tbl.getDbName(), tbl.getTableName(), deleteData, + ignoreUnknownTbl, context); + } + + @Override public void dropTable(String dbname, String name) throws TException { dropTable(getDefaultCatalog(conf), dbname, name, true, true, null); } @@ -1809,7 +1825,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { envContext = new EnvironmentContext(warehouseOptions); } dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext); - } /** diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 53f3c02..1f41d0e 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -443,9 +443,8 @@ public interface IMetaStoreClient { * A thrift communication error occurred * */ - void dropTable(String dbname, String tableName, boolean deleteData, - boolean ignoreUnknownTab) throws MetaException, TException, - NoSuchObjectException; + void dropTable(String dbname, String tableName, boolean deleteData, boolean ignoreUnknownTab) + throws MetaException, TException, NoSuchObjectException; /** * Drop the table. @@ -469,8 +468,11 @@ public interface IMetaStoreClient { */ @Deprecated // TODO: deprecate all methods without a catalog here; a single layer (e.g. Hive.java) should handle current-catalog void dropTable(String dbname, String tableName, boolean deleteData, - boolean ignoreUnknownTab, boolean ifPurge) throws MetaException, TException, - NoSuchObjectException; + boolean ignoreUnknownTab, boolean ifPurge) + throws MetaException, TException, NoSuchObjectException; + + void dropTable(Table table, boolean deleteData, boolean ignoreUnknownTab, boolean ifPurge) + throws TException; /** * Drop the table. diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 46593aa..1711ef6 100755 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -57,6 +57,9 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX; + /** * This class represents a warehouse where data of Hive tables is stored */ @@ -363,8 +366,14 @@ public class Warehouse { } else { dbPath = getDatabaseManagedPath(db); } - return getDnsPath( - new Path(dbPath, MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); + if (!isExternal && tableName.matches("(.*)" + SOFT_DELETE_TABLE_PATTERN)) { + String[] groups = tableName.split(SOFT_DELETE_PATH_SUFFIX); + tableName = String.join(SOFT_DELETE_PATH_SUFFIX, + MetaStoreUtils.encodeTableName(groups[0].toLowerCase()), groups[1]); + } else { + tableName = MetaStoreUtils.encodeTableName(tableName.toLowerCase()); + } + return getDnsPath(new Path(dbPath, tableName)); } public Path getDefaultManagedTablePath(Database db, String tableName) throws MetaException { diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 737b7b5..20259cc 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -639,7 +639,8 @@ struct Table { // read purposes 26: optional FileMetadata fileMetadata, // optional serialized file-metadata for this table // for certain execution engines - 27: optional ObjectDictionary dictionary + 27: optional ObjectDictionary dictionary, + 28: optional i64 txnId, // txnId associated with the table creation } struct Partition { @@ -998,7 +999,7 @@ enum TxnType { READ_ONLY = 2, COMPACTION = 3, MATER_VIEW_REBUILD = 4, - SOFT_DELETE = 5 + SOFT_DELETE = 5 } // specifies which info to return with GetTablesExtRequest diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java index 2034d85..6950f7f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -71,10 +71,32 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener { } @Override - public void onDropTable(DropTableEvent tableEvent) throws MetaException { - if (TxnUtils.isTransactionalTable(tableEvent.getTable())) { + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + Table table = tableEvent.getTable(); + + if (TxnUtils.isTransactionalTable(table)) { txnHandler = getTxnHandler(); - txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null); + txnHandler.cleanupRecords(HiveObjectType.TABLE, null, table, null, !tableEvent.getDeleteData()); + + if (!tableEvent.getDeleteData()) { + long currentTxn = Optional.ofNullable(tableEvent.getEnvironmentContext()) + .map(EnvironmentContext::getProperties) + .map(prop -> prop.get("txnId")) + .map(Long::parseLong) + .orElse(0L); + + try { + if (currentTxn > 0) { + CompactionRequest rqst = new CompactionRequest(table.getDbName(), table.getTableName(), CompactionType.MAJOR); + rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf)); + rqst.putToProperties("location", table.getSd().getLocation()); + rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(tableEvent.getEnvironmentContext(), table))); + txnHandler.submitForCleanup(rqst, table.getWriteId(), currentTxn); + } + } catch (InterruptedException | IOException e) { + throwMetaException(e); + } + } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java index ff79412..49ea028 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java @@ -105,6 +105,9 @@ import java.util.regex.Pattern; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.join; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX; +import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; +import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS; import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS; @@ -2410,7 +2413,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { if (tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { - tblPath = wh.getDefaultTablePath(db, tbl); + tblPath = wh.getDefaultTablePath(db, getTableName(tbl), isExternal(tbl)); } else { if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { LOG.warn("Location: " + tbl.getSd().getLocation() @@ -2562,6 +2565,12 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { } } + private String getTableName(Table tbl) { + return tbl.getTableName() + (tbl.isSetTxnId() && + tbl.getParameters() != null && Boolean.parseBoolean(tbl.getParameters().get(SOFT_DELETE_TABLE)) ? + SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, tbl.getTxnId()) : ""); + } + @Override public void create_table(final Table tbl) throws AlreadyExistsException, MetaException, InvalidObjectException, InvalidInputException { @@ -2952,8 +2961,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { private boolean drop_table_core(final RawStore ms, final String catName, final String dbname, final String name, final boolean deleteData, final EnvironmentContext envContext, final String indexName, boolean dropPartitions) - throws NoSuchObjectException, MetaException, IOException, InvalidObjectException, - InvalidInputException { + throws TException, IOException { boolean success = false; boolean tableDataShouldBeDeleted = false; Path tblPath = null; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 3a159f7..06c1908 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -1501,13 +1501,13 @@ class CompactionTxnHandler extends TxnHandler { } @Override - protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, - long tempId) throws SQLException, MetaException { + protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, + Long commitId, long tempId) throws SQLException, MetaException { super.updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType, commitId, tempId); + if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) { - stmt.executeUpdate( - "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + ", \"CQ_COMMIT_TIME\" = " + - getEpochFn(dbProduct) + " WHERE \"CQ_TXN_ID\" = " + txnid); + stmt.executeUpdate("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + ", \"CQ_COMMIT_TIME\" = " + + getEpochFn(dbProduct) + " WHERE \"CQ_TXN_ID\" = " + txnid); } } @@ -1545,7 +1545,7 @@ class CompactionTxnHandler extends TxnHandler { } @Override - protected void createCommitNotificationEvent(Connection dbConn, long txnid, Optional<TxnType> txnType) + protected void createCommitNotificationEvent(Connection dbConn, long txnid, TxnType txnType) throws MetaException, SQLException { super.createCommitNotificationEvent(dbConn, txnid, txnType); if (transactionalListeners != null) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2a9d6bb..58060f2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -175,7 +175,6 @@ import static org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatc import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; - import com.google.common.annotations.VisibleForTesting; /** @@ -1055,8 +1054,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { txnid = targetTxnIds.get(0); } - Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid); - if (!txnType.isPresent()) { + TxnType txnType = getOpenTxnTypeAndLock(stmt, txnid); + if (txnType == null) { TxnStatus status = findTxnState(txnid, stmt); if (status == TxnStatus.ABORTED) { if (isReplayedReplTxn) { @@ -1078,7 +1077,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (transactionalListeners != null && !isHiveReplTxn) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnType.get()), dbConn, sqlGenerator); + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnType), dbConn, sqlGenerator); } LOG.debug("Going to commit"); @@ -1432,8 +1431,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid); - if (!txnType.isPresent()) { + TxnType txnType = getOpenTxnTypeAndLock(stmt, txnid); + if (txnType == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); if (actualTxnStatus == TxnStatus.COMMITTED) { @@ -1455,11 +1454,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { OperationType.UPDATE + "," + OperationType.DELETE + ")"; long tempCommitId = generateTemporaryId(); - if (txnType.get() == TxnType.COMPACTION) { + if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) { acquireTxnLock(stmt, false); commitId = getHighWaterMark(stmt); - } else if (txnType.get() != TxnType.READ_ONLY && !isReplayedReplTxn) { + } else if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) { String writeSetInsertSql = "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\"," + " \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" "; @@ -1545,7 +1544,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { assert true; } - if (txnType.get() != TxnType.READ_ONLY && !isReplayedReplTxn) { + if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) { moveTxnComponentsToCompleted(stmt, txnid, isUpdateDelete); } else if (isReplayedReplTxn) { if (rqst.isSetWriteEventInfos()) { @@ -1573,7 +1572,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType.get(), commitId, tempCommitId); + updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType, commitId, tempCommitId); removeTxnsFromMinHistoryLevel(dbConn, ImmutableList.of(txnid)); if (rqst.isSetKeyValue()) { updateKeyValueAssociatedWithTxn(rqst, stmt); @@ -1611,11 +1610,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * @param txnType transaction type * @throws MetaException ex */ - protected void createCommitNotificationEvent(Connection dbConn, long txnid, Optional<TxnType> txnType) + protected void createCommitNotificationEvent(Connection dbConn, long txnid, TxnType txnType) throws MetaException, SQLException { if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnType.get()), dbConn, sqlGenerator); + EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnType), dbConn, sqlGenerator); } } @@ -2959,12 +2958,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * @throws SQLException * @throws MetaException */ - private Optional<TxnType> getOpenTxnTypeAndLock(Statement stmt, long txnId) throws SQLException, MetaException { + private TxnType getOpenTxnTypeAndLock(Statement stmt, long txnId) throws SQLException, MetaException { String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnId + " AND \"TXN_STATE\" = " + TxnStatus.OPEN; try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { - return rs.next() ? Optional.ofNullable( - TxnType.findByValue(rs.getInt(1))) : Optional.empty(); + return rs.next() ? TxnType.findByValue(rs.getInt(1)) : null; } } @@ -2990,8 +2988,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state - Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid); - if (!txnType.isPresent()) { + TxnType txnType = getOpenTxnTypeAndLock(stmt, txnid); + if (txnType == null) { ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); } @@ -3817,7 +3815,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } @Override - @RetrySemantics.Idempotent + @RetrySemantics.SafeToRetry public boolean submitForCleanup(CompactionRequest rqst, long highestWriteId, long txnId) throws MetaException { // Put a compaction request in the queue. try { @@ -3835,7 +3833,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { cqId = generateCompactionQueueId(stmt); } StringBuilder buf = new StringBuilder( - "INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TXN_ID\", \"CQ_ENQUEUE_TIME\", \"CQ_DATABASE\", \"CQ_TABLE\", "); + "INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TXN_ID\", \"CQ_ENQUEUE_TIME\", \"CQ_DATABASE\", \"CQ_TABLE\", "); String partName = rqst.getPartitionname(); if (partName != null) { buf.append("\"CQ_PARTITION\", "); @@ -4136,8 +4134,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, rqst.getTxnid()); - if (!txnType.isPresent()) { + TxnType txnType = getOpenTxnTypeAndLock(stmt, rqst.getTxnid()); + if (txnType == null) { //ensures txn is still there and in expected state ensureValidTxn(dbConn, rqst.getTxnid(), stmt); shouldNeverHappen(rqst.getTxnid()); @@ -4205,7 +4203,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.Idempotent public void cleanupRecords(HiveObjectType type, Database db, Table table, - Iterator<Partition> partitionIterator) throws MetaException { + Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData) throws MetaException { // cleanup should be done only for objects belonging to default catalog final String defaultCatalog = getDefaultCatalog(conf); @@ -4308,22 +4306,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { buff.append("'"); queries.add(buff.toString()); - buff.setLength(0); - buff.append("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\"='"); - buff.append(dbName.toLowerCase()); - buff.append("' AND \"T2W_TABLE\"='"); - buff.append(tblName.toLowerCase()); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("DELETE FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\"='"); - buff.append(dbName.toLowerCase()); - buff.append("' AND \"NWI_TABLE\"='"); - buff.append(tblName.toLowerCase()); - buff.append("'"); - queries.add(buff.toString()); + if (!keepTxnToWriteIdMetaData) { + buff.setLength(0); + buff.append("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\"='"); + buff.append(dbName.toLowerCase()); + buff.append("' AND \"T2W_TABLE\"='"); + buff.append(tblName.toLowerCase()); + buff.append("'"); + queries.add(buff.toString()); + buff.setLength(0); + buff.append("DELETE FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\"='"); + buff.append(dbName.toLowerCase()); + buff.append("' AND \"NWI_TABLE\"='"); + buff.append(tblName.toLowerCase()); + buff.append("'"); + queries.add(buff.toString()); + } break; } case PARTITION: { @@ -4413,7 +4412,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { closeDbConn(dbConn); } } catch (RetryException e) { - cleanupRecords(type, db, table, partitionIterator); + cleanupRecords(type, db, table, partitionIterator, keepTxnToWriteIdMetaData); } } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 5837727..1558dd2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -346,7 +346,7 @@ public interface TxnStore extends Configurable { @RetrySemantics.Idempotent CompactionResponse compact(CompactionRequest rqst) throws MetaException; - @RetrySemantics.Idempotent + @RetrySemantics.SafeToRetry boolean submitForCleanup(CompactionRequest rqst, long highestWriteId, long txnId) throws MetaException; /** @@ -393,8 +393,14 @@ public interface TxnStore extends Configurable { * @throws MetaException */ @RetrySemantics.Idempotent - void cleanupRecords(HiveObjectType type, Database db, Table table, - Iterator<Partition> partitionIterator) throws MetaException; + default void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator) throws MetaException { + cleanupRecords(type, db, table, partitionIterator, false); + } + + @RetrySemantics.Idempotent + void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData) throws MetaException; @RetrySemantics.Idempotent void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index eca6caa..88a0eae 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -514,8 +514,8 @@ public class TxnUtils { * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat * the location. */ - public static String findUserToRunAs(String location, Table t, Configuration conf) - throws IOException, InterruptedException { + public static String findUserToRunAs(String location, Table t, Configuration conf) + throws IOException, InterruptedException { LOG.debug("Determining who to run the job as."); // check if a specific user is set in config @@ -558,8 +558,8 @@ public class TxnUtils { return wrapper.get(0); } } - LOG.error("Unable to stat file " + p + " as either current user(" + - UserGroupInformation.getLoginUser() + ") or table owner(" + t.getOwner() + "), giving up"); + LOG.error("Unable to stat file " + p + " as either current user(" + + UserGroupInformation.getLoginUser() + ") or table owner(" + t.getOwner() + "), giving up"); throw new IOException("Unable to stat file: " + p); } } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index 7bdfa51..221b2e6 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -1148,6 +1148,11 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos } dropTable(dbname, name, deleteData, ignoreUnknownTab, envContext); } + + @Override + public void dropTable(Table table, boolean deleteData, boolean ignoreUnknownTab, boolean ifPurge) throws TException { + dropTable(table.getDbName(), table.getTableName(), deleteData, ignoreUnknownTab, ifPurge); + } /** * @see #dropTable(String, String, boolean, boolean, EnvironmentContext) diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java index 897eb5b..460727d 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java @@ -34,11 +34,11 @@ public class ThrowingTxnHandler extends CompactionTxnHandler { @Override public void cleanupRecords(HiveObjectType type, Database db, Table table, - Iterator<Partition> partitionIterator) throws MetaException { + Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData) throws MetaException { if (doThrow) { throw new RuntimeException("during transactional cleanup"); } - super.cleanupRecords(type, db, table, partitionIterator); + super.cleanupRecords(type, db, table, partitionIterator, keepTxnToWriteIdMetaData); } @Override diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/AcidConstants.java b/storage-api/src/java/org/apache/hadoop/hive/common/AcidConstants.java index 1afe351..2896ef9 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/AcidConstants.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/AcidConstants.java @@ -40,6 +40,10 @@ public class AcidConstants { public static final String VISIBILITY_PREFIX = "_v"; public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+"); + public static final String SOFT_DELETE_PATH_SUFFIX = ".v"; + public static final String SOFT_DELETE_TABLE_PATTERN = "\\" + SOFT_DELETE_PATH_SUFFIX + "\\d+"; + public static final String SOFT_DELETE_TABLE = "soft_delete"; + public static String baseDir(long writeId) { return BASE_PREFIX + String.format(DELTA_DIGITS, writeId); }