This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 78d42f0 HIVE-24197:Check for write transactions for the db under replication at a frequent interval (Aasha Medhi, reviewed by Pravin Kumar Sinha) 78d42f0 is described below commit 78d42f0321c846ee74794a58b94a92f65797430d Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Mon Oct 12 09:34:11 2020 +0530 HIVE-24197:Check for write transactions for the db under replication at a frequent interval (Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../parse/TestReplicationScenariosAcidTables.java | 108 +++++++++++++++++++-- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 78 ++++++++++----- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 +++++ .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 ++- .../hive/metastore/api/ShowLocksRequest.java | 106 +++++++++++++++++++- .../thrift/gen-php/metastore/ShowLocksRequest.php | 24 +++++ .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 ++- .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +- .../src/main/thrift/hive_metastore.thrift | 1 + .../hadoop/hive/metastore/txn/TxnHandler.java | 8 ++ 10 files changed, 339 insertions(+), 38 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 70151ee..575eeab 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -65,9 +65,9 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLI import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; - +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; /** * TestReplicationScenariosAcidTables - test replication for ACID tables. */ @@ -337,7 +337,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); // Open 5 txns List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); - // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. primary.run("use " + primaryDbName) .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + @@ -356,13 +355,105 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra", tablesInSecDb, txnHandler, txns, primaryConf); - // Bootstrap dump with open txn timeout as 1s. + // Bootstrap dump with open txn timeout as 300s. + //Since transactions belong to different db it won't wait. List<String> withConfigs = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT + "'='1s'"); - WarehouseInstance.Tuple bootstrapDump = primary + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT + "'='300s'"); + long timeStarted = System.currentTimeMillis(); + WarehouseInstance.Tuple bootstrapDump = null; + try { + bootstrapDump = primary + .run("use " + primaryDbName) + .dump(primaryDbName, withConfigs); + } finally { + //Dump shouldn't wait for 300s. It should check in the 30 secs itself that those txns belong to different db + Assert.assertTrue(System.currentTimeMillis() - timeStarted < 300000); + } + + // After bootstrap dump, all the opened txns should not be aborted as itr belongs to a diff db. Verify it. + verifyAllOpenTxnsNotAborted(txns, primaryConf); + Map<String, Long> tablesInPrimary = new HashMap<>(); + tablesInPrimary.put("t1", 1L); + tablesInPrimary.put("t2", 2L); + verifyNextId(tablesInPrimary, primaryDbName, primaryConf); + + // Bootstrap load which should not replicate the write ids on both tables as they are on different db. + HiveConf replicaConf = replica.getConf(); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1", "t2"}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1") + .verifyResults(new String[]{"1"}) + .run("select rank from t2 order by rank") + .verifyResults(new String[]{"10", "11"}); + + // Verify if HWM is properly set after REPL LOAD + verifyNextId(tablesInPrimary, replicatedDbName, replicaConf); + + // Verify if none of the write ids are not replicated to the replicated DB as they belong to diff db + for (Map.Entry<String, Long> entry : tablesInPrimary.entrySet()) { + entry.setValue((long) 0); + } + verifyWriteIdsForTables(tablesInPrimary, replicaConf, replicatedDbName); + //Abort the txns + txnHandler.abortTxns(new AbortTxnsRequest(txns)); + verifyAllOpenTxnsAborted(txns, primaryConf); + //Release the locks + releaseLocks(txnHandler, lockIds); + } + + @Test + public void testAcidTablesBootstrapWithOpenTxnsWaitingForLock() throws Throwable { + int numTxns = 5; + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + // Open 5 txns + List<Long> txns = openTxns(numTxns, txnHandler, primaryConf); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + + // Bootstrap dump with open txn timeout as 80s. Dump should fail as there will be open txns and + // lock is not acquired by them and we have set abort to false + List<String> withConfigs = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT + "'='80s'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT + "'='false'"); + try { + primary + .run("use " + primaryDbName) + .dump(primaryDbName, withConfigs); + fail(); + } catch (Exception e) { + Assert.assertEquals(IllegalStateException.class, e.getClass()); + Assert.assertEquals("REPL DUMP cannot proceed. Force abort all the open txns is disabled. " + + "Enable hive.repl.bootstrap.dump.abort.write.txn.after.timeout to proceed.", e.getMessage()); + } + + withConfigs = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT + "'='1s'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT + "'='true'"); + // Acquire locks + // Allocate write ids for both tables of secondary db for all txns + // t1=5 and t2=5 + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxns); + tablesInSecDb.put("t2", (long) numTxns); + List<Long> lockIds = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txns, primaryConf); + + WarehouseInstance.Tuple bootstrapDump = primary .run("use " + primaryDbName) .dump(primaryDbName, withConfigs); - // After bootstrap dump, all the opened txns should not be aborted as itr belongs to a diff db. Verify it. verifyAllOpenTxnsNotAborted(txns, primaryConf); Map<String, Long> tablesInPrimary = new HashMap<>(); @@ -393,6 +484,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios verifyWriteIdsForTables(tablesInPrimary, replicaConf, replicatedDbName); //Abort the txns txnHandler.abortTxns(new AbortTxnsRequest(txns)); + verifyAllOpenTxnsAborted(txns, primaryConf); //Release the locks releaseLocks(txnHandler, lockIds); } @@ -469,6 +561,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios verifyWriteIdsForTables(tablesInPrimDb, replicaConf, replicatedDbName); //Abort the txns for secondary db txnHandler.abortTxns(new AbortTxnsRequest(txns)); + verifyAllOpenTxnsAborted(txns, primaryConf); //Release the locks releaseLocks(txnHandler, lockIds); } @@ -515,6 +608,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios verifyAllOpenTxnsNotAborted(txns, primaryConf); //Abort the txns txnHandler.abortTxns(new AbortTxnsRequest(txns)); + verifyAllOpenTxnsAborted(txns, primaryConf); //Release the locks releaseLocks(txnHandler, lockIds); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 5d3a004..4630f95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -95,7 +95,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -104,7 +103,6 @@ import java.util.List; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Base64; import java.util.UUID; import java.util.ArrayList; import java.util.Map; @@ -121,7 +119,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final long serialVersionUID = 1L; private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME; - private static final long SLEEP_TIME = 60000; + private static final long SLEEP_TIME = 5 * 60000; + private static final long SLEEP_TIME_FOR_TESTS = 30000; private Set<String> tablesForBootstrap = new HashSet<>(); public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); @@ -1017,14 +1016,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } - private boolean dataCopyRequired(TableSpec tableSpec) { - if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) - || Utils.shouldDumpMetaDataOnly(conf)) { - return false; - } - return true; - } - private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { if ((validTxnString == null) || validTxnString.isEmpty()) { return null; @@ -1036,15 +1027,28 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return ((validWriteIds != null) ? validWriteIds.toString() : null); } - private List<Long> getOpenTxns(ValidTxnList validTxnList) { - long[] invalidTxns = validTxnList.getInvalidTransactions(); - List<Long> openTxns = new ArrayList<>(); - for (long invalidTxn : invalidTxns) { - if (!validTxnList.isTxnAborted(invalidTxn)) { - openTxns.add(invalidTxn); + private List<Long> getTxnsNotPresentInHiveLocksTable(List<Long> openTxnList) throws LockException { + List<Long> txnsNotPresentInHiveLocks = new ArrayList<>(); + for (long openTxnId : openTxnList) { + if (!isTxnPresentInHiveLocks(openTxnId)) { + txnsNotPresentInHiveLocks.add(openTxnId); } } - return openTxns; + return txnsNotPresentInHiveLocks; + } + + /** + * Get if there is an entry for the txn id in the hive locks table. It can be in waiting state or acquired state. + * @param txnId + * @return true if the entry for the txn id is present in hive locks. + * @throws LockException + */ + private boolean isTxnPresentInHiveLocks(long txnId) throws LockException { + ShowLocksRequest request = new ShowLocksRequest(); + request.setTxnid(txnId); + HiveLockManager lockManager = getTxnMgr().getLockManager(); + ShowLocksResponse showLocksResponse = ((DbLockManager) lockManager).getLocks(request); + return !showLocksResponse.getLocks().isEmpty(); } List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws LockException { @@ -1089,14 +1093,26 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY, TxnType.REPL_CREATED); ValidTxnList validTxnList = getTxnMgr().getValidTxns(excludedTxns); while (System.currentTimeMillis() < waitUntilTime) { - // If there are no txns which are open for the given ValidTxnList snapshot, then just return it. - if (getOpenTxns(validTxnList).isEmpty()) { + //check if no open txns at all + List<Long> openTxnListForAllDbs = getOpenTxns(validTxnList); + if (openTxnListForAllDbs.isEmpty()) { return validTxnList.toString(); } - - // Wait for 1 minute and check again. + //check if all transactions that are open are inserted into the hive locks table. If not wait and check again. + //Transactions table don't contain the db information. DB information is present only in the hive locks table. + //Transactions are inserted into the hive locks table after compilation. We need to make sure all transactions + //that are open have a entry in hive locks which can give us the db information and then we only wait for open + //transactions for the db under replication and not for all open transactions. + if (getTxnsNotPresentInHiveLocksTable(openTxnListForAllDbs).isEmpty()) { + //If all open txns have been inserted in the hive locks table, we just need to check for the db under replication + // If there are no txns which are open for the given db under replication, then just return it. + if (getOpenTxns(validTxnList, work.dbNameOrPattern).isEmpty()) { + return validTxnList.toString(); + } + } + // Wait for 5 minutes and check again. try { - Thread.sleep(SLEEP_TIME); + Thread.sleep(getSleepTime()); } catch (InterruptedException e) { LOG.info("REPL DUMP thread sleep interrupted", e); } @@ -1125,6 +1141,22 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return validTxnList.toString(); } + private long getSleepTime() { + return (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) + || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) ? SLEEP_TIME_FOR_TESTS : SLEEP_TIME; + } + + private List<Long> getOpenTxns(ValidTxnList validTxnList) { + long[] invalidTxns = validTxnList.getInvalidTransactions(); + List<Long> openTxns = new ArrayList<>(); + for (long invalidTxn : invalidTxns) { + if (!validTxnList.isTxnAborted(invalidTxn)) { + openTxns.add(invalidTxn); + } + } + return openTxns; + } + private ReplicationSpec getNewReplicationSpec(String evState, String objState, boolean isMetadataOnly) { return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 76e4b397..196abc9 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -23872,6 +23872,11 @@ void ShowLocksRequest::__set_isExtended(const bool val) { this->isExtended = val; __isset.isExtended = true; } + +void ShowLocksRequest::__set_txnid(const int64_t val) { + this->txnid = val; +__isset.txnid = true; +} std::ostream& operator<<(std::ostream& out, const ShowLocksRequest& obj) { obj.printTo(out); @@ -23932,6 +23937,14 @@ uint32_t ShowLocksRequest::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->txnid); + this->__isset.txnid = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -23969,6 +23982,11 @@ uint32_t ShowLocksRequest::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += oprot->writeBool(this->isExtended); xfer += oprot->writeFieldEnd(); } + if (this->__isset.txnid) { + xfer += oprot->writeFieldBegin("txnid", ::apache::thrift::protocol::T_I64, 5); + xfer += oprot->writeI64(this->txnid); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -23980,6 +23998,7 @@ void swap(ShowLocksRequest &a, ShowLocksRequest &b) { swap(a.tablename, b.tablename); swap(a.partname, b.partname); swap(a.isExtended, b.isExtended); + swap(a.txnid, b.txnid); swap(a.__isset, b.__isset); } @@ -23988,6 +24007,7 @@ ShowLocksRequest::ShowLocksRequest(const ShowLocksRequest& other887) { tablename = other887.tablename; partname = other887.partname; isExtended = other887.isExtended; + txnid = other887.txnid; __isset = other887.__isset; } ShowLocksRequest& ShowLocksRequest::operator=(const ShowLocksRequest& other888) { @@ -23995,6 +24015,7 @@ ShowLocksRequest& ShowLocksRequest::operator=(const ShowLocksRequest& other888) tablename = other888.tablename; partname = other888.partname; isExtended = other888.isExtended; + txnid = other888.txnid; __isset = other888.__isset; return *this; } @@ -24005,6 +24026,7 @@ void ShowLocksRequest::printTo(std::ostream& out) const { out << ", " << "tablename="; (__isset.tablename ? (out << to_string(tablename)) : (out << "<null>")); out << ", " << "partname="; (__isset.partname ? (out << to_string(partname)) : (out << "<null>")); out << ", " << "isExtended="; (__isset.isExtended ? (out << to_string(isExtended)) : (out << "<null>")); + out << ", " << "txnid="; (__isset.txnid ? (out << to_string(txnid)) : (out << "<null>")); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index a243d9a..3cd3620 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -8953,11 +8953,12 @@ void swap(UnlockRequest &a, UnlockRequest &b); std::ostream& operator<<(std::ostream& out, const UnlockRequest& obj); typedef struct _ShowLocksRequest__isset { - _ShowLocksRequest__isset() : dbname(false), tablename(false), partname(false), isExtended(true) {} + _ShowLocksRequest__isset() : dbname(false), tablename(false), partname(false), isExtended(true), txnid(false) {} bool dbname :1; bool tablename :1; bool partname :1; bool isExtended :1; + bool txnid :1; } _ShowLocksRequest__isset; class ShowLocksRequest : public virtual ::apache::thrift::TBase { @@ -8965,7 +8966,7 @@ class ShowLocksRequest : public virtual ::apache::thrift::TBase { ShowLocksRequest(const ShowLocksRequest&); ShowLocksRequest& operator=(const ShowLocksRequest&); - ShowLocksRequest() : dbname(), tablename(), partname(), isExtended(false) { + ShowLocksRequest() : dbname(), tablename(), partname(), isExtended(false), txnid(0) { } virtual ~ShowLocksRequest() noexcept; @@ -8973,6 +8974,7 @@ class ShowLocksRequest : public virtual ::apache::thrift::TBase { std::string tablename; std::string partname; bool isExtended; + int64_t txnid; _ShowLocksRequest__isset __isset; @@ -8984,6 +8986,8 @@ class ShowLocksRequest : public virtual ::apache::thrift::TBase { void __set_isExtended(const bool val); + void __set_txnid(const int64_t val); + bool operator == (const ShowLocksRequest & rhs) const { if (__isset.dbname != rhs.__isset.dbname) @@ -9002,6 +9006,10 @@ class ShowLocksRequest : public virtual ::apache::thrift::TBase { return false; else if (__isset.isExtended && !(isExtended == rhs.isExtended)) return false; + if (__isset.txnid != rhs.__isset.txnid) + return false; + else if (__isset.txnid && !(txnid == rhs.txnid)) + return false; return true; } bool operator != (const ShowLocksRequest &rhs) const { diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowLocksRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowLocksRequest.java index edfc2db..b09621f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowLocksRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowLocksRequest.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField TABLENAME_FIELD_DESC = new org.apache.thrift.protocol.TField("tablename", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField PARTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("partname", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField IS_EXTENDED_FIELD_DESC = new org.apache.thrift.protocol.TField("isExtended", org.apache.thrift.protocol.TType.BOOL, (short)4); + private static final org.apache.thrift.protocol.TField TXNID_FIELD_DESC = new org.apache.thrift.protocol.TField("txnid", org.apache.thrift.protocol.TType.I64, (short)5); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ShowLocksRequestStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ShowLocksRequestTupleSchemeFactory(); @@ -23,13 +24,15 @@ package org.apache.hadoop.hive.metastore.api; private @org.apache.thrift.annotation.Nullable java.lang.String tablename; // optional private @org.apache.thrift.annotation.Nullable java.lang.String partname; // optional private boolean isExtended; // optional + private long txnid; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { DBNAME((short)1, "dbname"), TABLENAME((short)2, "tablename"), PARTNAME((short)3, "partname"), - IS_EXTENDED((short)4, "isExtended"); + IS_EXTENDED((short)4, "isExtended"), + TXNID((short)5, "txnid"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -53,6 +56,8 @@ package org.apache.hadoop.hive.metastore.api; return PARTNAME; case 4: // IS_EXTENDED return IS_EXTENDED; + case 5: // TXNID + return TXNID; default: return null; } @@ -95,8 +100,9 @@ package org.apache.hadoop.hive.metastore.api; // isset id assignments private static final int __ISEXTENDED_ISSET_ID = 0; + private static final int __TXNID_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.DBNAME,_Fields.TABLENAME,_Fields.PARTNAME,_Fields.IS_EXTENDED}; + private static final _Fields optionals[] = {_Fields.DBNAME,_Fields.TABLENAME,_Fields.PARTNAME,_Fields.IS_EXTENDED,_Fields.TXNID}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -108,6 +114,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.IS_EXTENDED, new org.apache.thrift.meta_data.FieldMetaData("isExtended", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.TXNID, new org.apache.thrift.meta_data.FieldMetaData("txnid", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ShowLocksRequest.class, metaDataMap); } @@ -132,6 +140,7 @@ package org.apache.hadoop.hive.metastore.api; this.partname = other.partname; } this.isExtended = other.isExtended; + this.txnid = other.txnid; } public ShowLocksRequest deepCopy() { @@ -145,6 +154,8 @@ package org.apache.hadoop.hive.metastore.api; this.partname = null; this.isExtended = false; + setTxnidIsSet(false); + this.txnid = 0; } @org.apache.thrift.annotation.Nullable @@ -241,6 +252,28 @@ package org.apache.hadoop.hive.metastore.api; __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ISEXTENDED_ISSET_ID, value); } + public long getTxnid() { + return this.txnid; + } + + public void setTxnid(long txnid) { + this.txnid = txnid; + setTxnidIsSet(true); + } + + public void unsetTxnid() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TXNID_ISSET_ID); + } + + /** Returns true if field txnid is set (has been assigned a value) and false otherwise */ + public boolean isSetTxnid() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TXNID_ISSET_ID); + } + + public void setTxnidIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TXNID_ISSET_ID, value); + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case DBNAME: @@ -275,6 +308,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case TXNID: + if (value == null) { + unsetTxnid(); + } else { + setTxnid((java.lang.Long)value); + } + break; + } } @@ -293,6 +334,9 @@ package org.apache.hadoop.hive.metastore.api; case IS_EXTENDED: return isIsExtended(); + case TXNID: + return getTxnid(); + } throw new java.lang.IllegalStateException(); } @@ -312,6 +356,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetPartname(); case IS_EXTENDED: return isSetIsExtended(); + case TXNID: + return isSetTxnid(); } throw new java.lang.IllegalStateException(); } @@ -367,6 +413,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_txnid = true && this.isSetTxnid(); + boolean that_present_txnid = true && that.isSetTxnid(); + if (this_present_txnid || that_present_txnid) { + if (!(this_present_txnid && that_present_txnid)) + return false; + if (this.txnid != that.txnid) + return false; + } + return true; } @@ -390,6 +445,10 @@ package org.apache.hadoop.hive.metastore.api; if (isSetIsExtended()) hashCode = hashCode * 8191 + ((isExtended) ? 131071 : 524287); + hashCode = hashCode * 8191 + ((isSetTxnid()) ? 131071 : 524287); + if (isSetTxnid()) + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(txnid); + return hashCode; } @@ -441,6 +500,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.valueOf(isSetTxnid()).compareTo(other.isSetTxnid()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTxnid()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.txnid, other.txnid); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -497,6 +566,12 @@ package org.apache.hadoop.hive.metastore.api; sb.append(this.isExtended); first = false; } + if (isSetTxnid()) { + if (!first) sb.append(", "); + sb.append("txnid:"); + sb.append(this.txnid); + first = false; + } sb.append(")"); return sb.toString(); } @@ -574,6 +649,14 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // TXNID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.txnid = iprot.readI64(); + struct.setTxnidIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -613,6 +696,11 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeBool(struct.isExtended); oprot.writeFieldEnd(); } + if (struct.isSetTxnid()) { + oprot.writeFieldBegin(TXNID_FIELD_DESC); + oprot.writeI64(struct.txnid); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -643,7 +731,10 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetIsExtended()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetTxnid()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetDbname()) { oprot.writeString(struct.dbname); } @@ -656,12 +747,15 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetIsExtended()) { oprot.writeBool(struct.isExtended); } + if (struct.isSetTxnid()) { + oprot.writeI64(struct.txnid); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, ShowLocksRequest struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(4); + java.util.BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.dbname = iprot.readString(); struct.setDbnameIsSet(true); @@ -678,6 +772,10 @@ package org.apache.hadoop.hive.metastore.api; struct.isExtended = iprot.readBool(); struct.setIsExtendedIsSet(true); } + if (incoming.get(4)) { + struct.txnid = iprot.readI64(); + struct.setTxnidIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowLocksRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowLocksRequest.php index 4582224..16336d2 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowLocksRequest.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowLocksRequest.php @@ -41,6 +41,11 @@ class ShowLocksRequest 'isRequired' => false, 'type' => TType::BOOL, ), + 5 => array( + 'var' => 'txnid', + 'isRequired' => false, + 'type' => TType::I64, + ), ); /** @@ -59,6 +64,10 @@ class ShowLocksRequest * @var bool */ public $isExtended = false; + /** + * @var int + */ + public $txnid = null; public function __construct($vals = null) { @@ -75,6 +84,9 @@ class ShowLocksRequest if (isset($vals['isExtended'])) { $this->isExtended = $vals['isExtended']; } + if (isset($vals['txnid'])) { + $this->txnid = $vals['txnid']; + } } } @@ -125,6 +137,13 @@ class ShowLocksRequest $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->txnid); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -159,6 +178,11 @@ class ShowLocksRequest $xfer += $output->writeBool($this->isExtended); $xfer += $output->writeFieldEnd(); } + if ($this->txnid !== null) { + $xfer += $output->writeFieldBegin('txnid', TType::I64, 5); + $xfer += $output->writeI64($this->txnid); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index fa62cac..39dc9b5 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -13736,15 +13736,17 @@ class ShowLocksRequest(object): - tablename - partname - isExtended + - txnid """ - def __init__(self, dbname=None, tablename=None, partname=None, isExtended=False,): + def __init__(self, dbname=None, tablename=None, partname=None, isExtended=False, txnid=None,): self.dbname = dbname self.tablename = tablename self.partname = partname self.isExtended = isExtended + self.txnid = txnid def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -13775,6 +13777,11 @@ class ShowLocksRequest(object): self.isExtended = iprot.readBool() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.txnid = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -13801,6 +13808,10 @@ class ShowLocksRequest(object): oprot.writeFieldBegin('isExtended', TType.BOOL, 4) oprot.writeBool(self.isExtended) oprot.writeFieldEnd() + if self.txnid is not None: + oprot.writeFieldBegin('txnid', TType.I64, 5) + oprot.writeI64(self.txnid) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -27638,6 +27649,7 @@ ShowLocksRequest.thrift_spec = ( (2, TType.STRING, 'tablename', 'UTF8', None, ), # 2 (3, TType.STRING, 'partname', 'UTF8', None, ), # 3 (4, TType.BOOL, 'isExtended', None, False, ), # 4 + (5, TType.I64, 'txnid', None, None, ), # 5 ) all_structs.append(ShowLocksResponseElement) ShowLocksResponseElement.thrift_spec = ( diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index b817dc8..d691ea8 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -4030,12 +4030,14 @@ class ShowLocksRequest TABLENAME = 2 PARTNAME = 3 ISEXTENDED = 4 + TXNID = 5 FIELDS = { DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname', :optional => true}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename', :optional => true}, PARTNAME => {:type => ::Thrift::Types::STRING, :name => 'partname', :optional => true}, - ISEXTENDED => {:type => ::Thrift::Types::BOOL, :name => 'isExtended', :default => false, :optional => true} + ISEXTENDED => {:type => ::Thrift::Types::BOOL, :name => 'isExtended', :default => false, :optional => true}, + TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index cdd6e5e..99731ff 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1144,6 +1144,7 @@ struct ShowLocksRequest { 2: optional string tablename, 3: optional string partname, 4: optional bool isExtended=false, + 5: optional i64 txnid, } struct ShowLocksResponseElement { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 896a5543..2503876 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -3039,6 +3039,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String dbName = rqst.getDbname(); String tableName = rqst.getTablename(); String partName = rqst.getPartname(); + String txnId = rqst.isSetTxnid() ? String.valueOf(rqst.getTxnid()) : null; List<String> params = new ArrayList<>(); StringBuilder filter = new StringBuilder(); @@ -3060,6 +3061,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { filter.append("\"HL_PARTITION\"=?"); params.add(partName); } + if (txnId != null && !txnId.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("\"HL_TXNID\"=?"); + params.add(txnId); + } String whereClause = filter.toString(); if (!whereClause.isEmpty()) {