This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5bd9a14 HIVE-25346: cleanTxnToWriteIdTable breaks SNAPSHOT isolation (Denys Kuzmenko, reviewed by Peter Vary) 5bd9a14 is described below commit 5bd9a14e20e1c218c5292b8c37d5d5acf721541d Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Wed Oct 20 12:09:53 2021 +0200 HIVE-25346: cleanTxnToWriteIdTable breaks SNAPSHOT isolation (Denys Kuzmenko, reviewed by Peter Vary) Closes #2716 --- .../hadoop/hive/metastore/txn/TestTxnHandler.java | 5 +- .../apache/hadoop/hive/ql/TestTxnCommands3.java | 4 - .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 4 +- .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 72 +++++- .../ql/txn/compactor/TestCompactionMetrics.java | 4 +- .../hive/metastore/txn/CompactionTxnHandler.java | 59 +++-- .../hadoop/hive/metastore/txn/TxnHandler.java | 272 +++++++++++---------- 7 files changed, 246 insertions(+), 174 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index ba5d131..bbd5af0 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; @@ -1499,7 +1498,7 @@ public class TestTxnHandler { LOG.debug("no exception, no deadlock"); } catch (SQLException e) { try { - tHndlr.checkRetryable(conn1, e, "thread t1"); + tHndlr.checkRetryable(e, "thread t1"); LOG.debug("Got an exception, but not a deadlock, SQLState is " + e.getSQLState() + " class of exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); @@ -1529,7 +1528,7 @@ public class TestTxnHandler { LOG.debug("no exception, no deadlock"); } catch (SQLException e) { try { - tHndlr.checkRetryable(conn2, e, "thread t2"); + tHndlr.checkRetryable(e, "thread t2"); LOG.debug("Got an exception, but not a deadlock, SQLState is " + e.getSQLState() + " class of exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index eea2897..dda1c0c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -618,10 +618,6 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests { Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from " + table), 0, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table)); } - private void assertOneTxn() throws Exception { - Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1, - TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); - } @Test public void testWritesToDisabledCompactionTableCtas() throws Exception { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index 44f5307..12e2348 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClientWithLocalCache; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -64,7 +65,8 @@ public abstract class DbTxnManagerEndToEndTestBase { ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); - conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true); TestTxnDbUtil.cleanDb(conf); SessionState ss = SessionState.get(); ss.initTxnMgr(conf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index acd9786..43a78e0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2287,7 +2287,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr); driver.run("select * from target"); - List res = new ArrayList(); + List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); Assert.assertEquals("Duplicate records " + (extectedDuplicates ? "" : "not") + "found", extectedDuplicates ? 5 : 4, res.size()); @@ -2333,7 +2333,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr); driver.run("select * from target"); - List res = new ArrayList(); + List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); Assert.assertEquals(2, res.size()); Assert.assertEquals("Lost Update", "5\t8", res.get(1)); @@ -2380,7 +2380,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr); driver.run("select * from target where age=10"); - List res = new ArrayList(); + List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); Assert.assertEquals(2, res.size()); Assert.assertEquals("Lost Update", "[earl\t10, amy\t10]", res.toString()); @@ -2418,7 +2418,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr); driver.run("select * from target"); - List res = new ArrayList(); + List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); Assert.assertEquals(conflict ? 3 : 4, res.size()); } @@ -2451,7 +2451,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ driver.run("merge into target t using source s on t.a = s.a " + "when not matched then insert values (s.a, s.b, s.c)"); driver.run("select * from target"); - List res = new ArrayList(); + List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); // The merge should see all three partition and not create duplicates Assert.assertEquals("Duplicate records found", 6, res.size()); @@ -2522,7 +2522,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr); driver.run("select * from target"); - List res = new ArrayList(); + List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); // The merge should see all three partition and not create duplicates Assert.assertEquals("Duplicate records found", 6, res.size()); @@ -3299,4 +3299,64 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ Assert.assertEquals("Unexpected lock count", 0, locks.size()); } + @Test + public void testInsertSnapshotIsolationMinHistoryDisabled() throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false); + testInsertSnapshotIsolation(); + } + + @Test + public void testInsertSnapshotIsolation() throws Exception { + dropTable(new String[] {"tab_acid"}); + + driver.run("create table if not exists tab_acid (a int, b int) " + + "stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.compileAndRespond("insert into tab_acid values(1,2)"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("select * from tab_acid"); + swapTxnManager(txnMgr); + + driver.run(); + txnHandler.cleanTxnToWriteIdTable(); + swapTxnManager(txnMgr2); + + driver2.run(); + List<String> res = new ArrayList<>(); + driver2.getFetchTask().fetch(res); + Assert.assertEquals(0, res.size()); + } + + @Test + public void testUpdateSnapshotIsolationMinHistoryDisabled() throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false); + testUpdateSnapshotIsolation(); + } + + @Test + public void testUpdateSnapshotIsolation() throws Exception { + dropTable(new String[] {"tab_acid"}); + + driver.run("create table if not exists tab_acid (a int, b int) " + + "stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into tab_acid values(1,2)"); + driver.compileAndRespond("update tab_acid set a=2"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("select * from tab_acid"); + swapTxnManager(txnMgr); + + driver.run(); + txnHandler.cleanTxnToWriteIdTable(); + swapTxnManager(txnMgr2); + + driver2.run(); + List<String> res = new ArrayList<>(); + driver2.getFetchTask().fetch(res); + Assert.assertEquals(1, res.size()); + Assert.assertEquals("1\t2", res.get(0)); + } + } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java index 280d673..51fe394 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java @@ -79,6 +79,7 @@ public class TestCompactionMetrics extends CompactorTest { @Before public void setUp() throws Exception { MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true); // re-initialize metrics Metrics.shutdown(); Metrics.initialize(conf); @@ -542,6 +543,7 @@ public class TestCompactionMetrics extends CompactorTest { String tblName = "dcamc"; Table t = newTable(dbName, tblName, false); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false); long start = System.currentTimeMillis(); burnThroughTransactions(t.getDbName(), t.getTableName(), 24, new HashSet<>(Arrays.asList(22L, 23L, 24L)), null); openTxn(TxnType.REPL_CREATED); @@ -591,7 +593,7 @@ public class TestCompactionMetrics extends CompactorTest { txnHandler.cleanTxnToWriteIdTable(); runAcidMetricService(); - Assert.assertEquals(2, + Assert.assertEquals(3, Metrics.getOrCreateGauge(MetricsConstants.NUM_TXN_TO_WRITEID).intValue()); start = System.currentTimeMillis(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 3ed3fc0..d37e6ad 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -150,7 +150,7 @@ class CompactionTxnHandler extends TxnHandler { } } catch (SQLException e) { LOG.error("Unable to connect to transaction database " + e.getMessage()); - checkRetryable(dbConn, e, + checkRetryable(e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ", abortedTimeThreshold:" + abortedTimeThreshold + ")"); } finally { @@ -244,7 +244,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to select next element for compaction, " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findNextToCompact(rqst:" + rqst + ")"); + checkRetryable(e, "findNextToCompact(rqst:" + rqst + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -286,7 +286,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to update compaction queue " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markCompacted(" + info + ")"); + checkRetryable(e, "markCompacted(" + info + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -353,7 +353,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to select next element for cleaning, " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findReadyToClean"); + checkRetryable(e, "findReadyToClean"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -496,7 +496,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to delete from compaction queue " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markCleaned(" + info + ")"); + checkRetryable(e, "markCleaned(" + info + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -520,6 +520,8 @@ class CompactionTxnHandler extends TxnHandler { ResultSet rs = null; try { + long minTxnIdSeenOpen = findMinTxnIdSeenOpen(); + // We query for minimum values in all the queries and they can only increase by any concurrent // operations. So, READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -529,19 +531,22 @@ class CompactionTxnHandler extends TxnHandler { // If there are no txns which are currently open or aborted in the system, then current value of // max(TXNS.txn_id) could be min_uncommitted_txnid. String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + - "SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\" " + - "UNION " + - "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + - "UNION " + - "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + - " OR \"TXN_STATE\" = " + TxnStatus.OPEN + - ") \"RES\""; + " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" + + (useMinHistoryLevel ? "" : + " UNION" + + " SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"") + + " UNION" + + " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + + (useMinHistoryLevel ? "" : + " OR \"TXN_STATE\" = " + TxnStatus.OPEN) + + " ) \"RES\""; + LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); } - long minUncommitedTxnid = rs.getLong(1); + long minUncommitedTxnid = minTxnIdSeenOpen < 0 ? rs.getLong(1) : Math.min(rs.getLong(1), minTxnIdSeenOpen); // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. @@ -556,7 +561,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to delete from TXN_TO_WRITE_ID table " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "cleanTxnToWriteIdTable"); + checkRetryable(e, "cleanTxnToWriteIdTable"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -646,7 +651,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to delete from COMPLETED_TXN_COMPONENTS table " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "removeDuplicateCompletedTxnComponents"); + checkRetryable(e, "removeDuplicateCompletedTxnComponents"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -719,7 +724,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to delete from txns table " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "cleanEmptyAbortedTxns"); + checkRetryable(e, "cleanEmptyAbortedTxns"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -764,7 +769,7 @@ class CompactionTxnHandler extends TxnHandler { e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")"); + checkRetryable(e, "revokeFromLocalWorkers(hostname:" + hostname +")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -811,7 +816,7 @@ class CompactionTxnHandler extends TxnHandler { e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")"); + checkRetryable(e, "revokeTimedoutWorkers(timeout:" + timeout + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -878,7 +883,7 @@ class CompactionTxnHandler extends TxnHandler { return columns; } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName + + checkRetryable(e, "findColumnsWithStats(" + ci.tableName + (ci.partName == null ? "" : "/" + ci.partName) + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); @@ -940,7 +945,7 @@ class CompactionTxnHandler extends TxnHandler { dbConn.commit(); } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "updateCompactorState(" + ci + "," + compactionTxnId +")"); + checkRetryable(e, "updateCompactorState(" + ci + "," + compactionTxnId +")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1088,7 +1093,7 @@ class CompactionTxnHandler extends TxnHandler { dbConn.commit(); } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "purgeCompactionHistory()"); + checkRetryable(e, "purgeCompactionHistory()"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1172,7 +1177,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.error("Unable to check for failed compactions", e); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); + checkRetryable(e, "checkFailedCompactions(" + ci + ")"); LOG.error("Unable to connect to transaction database", e); return false;//weren't able to check } finally { @@ -1266,7 +1271,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markFailed(" + ci + ")"); + checkRetryable(e, "markFailed(" + ci + ")"); LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); } finally { close(rs, stmt, null); @@ -1296,7 +1301,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); + checkRetryable(e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); } finally { close(null, stmt, dbConn); @@ -1317,7 +1322,7 @@ class CompactionTxnHandler extends TxnHandler { } catch (SQLException e) { LOG.error("Unable to getMinOpenTxnIdForCleaner", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getMinOpenTxnForCleaner"); + checkRetryable(e, "getMinOpenTxnForCleaner"); throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() " + StringUtils.stringifyException(e)); } finally { @@ -1366,7 +1371,7 @@ class CompactionTxnHandler extends TxnHandler { } else { LOG.error("Unable to execute findMinTxnIdSeenOpen", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findMinTxnIdSeenOpen"); + checkRetryable(e, "findMinTxnIdSeenOpen"); throw new MetaException("Unable to execute findMinTxnIdSeenOpen() " + StringUtils.stringifyException(e)); } } finally { @@ -1411,7 +1416,7 @@ class CompactionTxnHandler extends TxnHandler { } catch (SQLException e) { LOG.error("Unable to getCompactionByTxnId", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getCompactionByTxnId"); + checkRetryable(e, "getCompactionByTxnId"); throw new MetaException("Unable to execute getCompactionByTxnId() " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 639ed48..9abe615 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -571,7 +571,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.debug("Got OpenTxnList with hwm: {} and openTxnList size {}.", hwm, txnInfos.size()); return new OpenTxnList(hwm, txnInfos); } catch (SQLException e) { - checkRetryable(dbConn, e, "getOpenTxnsList"); + checkRetryable(e, "getOpenTxnsList"); throw new MetaException( "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { @@ -682,7 +682,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); + checkRetryable(e, "openTxns(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); @@ -854,7 +854,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")"); + checkRetryable(e, "deleteInvalidOpenTransactions(" + txnIds + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); @@ -964,7 +964,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { assert (targetTxnIds.size() == 1); return targetTxnIds.get(0); } catch (SQLException e) { - checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")"); + checkRetryable(e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")"); throw new MetaException("Unable to get target transaction id " + StringUtils.stringifyException(e)); } finally { @@ -1077,7 +1077,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "abortTxn(" + rqst + ")"); + checkRetryable(e, "abortTxn(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1139,7 +1139,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "abortTxns(" + rqst + ")"); + checkRetryable(e, "abortTxns(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1381,11 +1381,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ @Override @RetrySemantics.Idempotent("No-op if already committed") - public void commitTxn(CommitTxnRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { char isUpdateDelete = 'N'; long txnid = rqst.getTxnid(); long sourceTxnId = -1; + boolean isReplayedReplTxn = TxnType.REPL_CREATED.equals(rqst.getTxn_type()); boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT.equals(rqst.getTxn_type()); try { @@ -1444,76 +1444,84 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN (" + OperationType.UPDATE + "," + OperationType.DELETE + ")"; - long tempCommitId = generateTemporaryId(); - if (txnType.get() != TxnType.READ_ONLY - && !isReplayedReplTxn - && isUpdateOrDelete(stmt, conflictSQLSuffix)) { - - isUpdateDelete = 'Y'; - //if here it means currently committing txn performed update/delete and we should check WW conflict - /** - * "select distinct" is used below because - * 1. once we get to multi-statement txns, we only care to record that something was updated once - * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it may create - * duplicate entries in TXN_COMPONENTS - * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct - * even if it includes all of its columns - * - * First insert into write_set using a temporary commitID, which will be updated in a separate call, - * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}. - * This should decrease the scope of the S4U lock on the next_txn_id table. - */ - Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); - stmt.executeUpdate("INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + - " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); - /** - * This S4U will mutex with other commitTxn() and openTxns(). - * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial - * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start - * at the same time and no new txns start until all 3 commit. - * We could've incremented the sequence for commitId as well but it doesn't add anything functionally. - */ + if (txnType.get() == TxnType.COMPACTION) { acquireTxnLock(stmt, false); commitId = getHighWaterMark(stmt); - if (!rqst.isExclWriteEnabled()) { + } else if (txnType.get() != TxnType.READ_ONLY && !isReplayedReplTxn) { + String writeSetInsertSql = "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\"," + + " \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" "; + + if (isUpdateOrDelete(stmt, conflictSQLSuffix)) { + isUpdateDelete = 'Y'; + //if here it means currently committing txn performed update/delete and we should check WW conflict /** - * see if there are any overlapping txns that wrote the same element, i.e. have a conflict - * Since entire commit operation is mutexed wrt other start/commit ops, - * committed.ws_commit_id <= current.ws_commit_id for all txns - * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap - * For example, [17,20] is committed, [6,80] is being committed right now - these overlap - * [17,20] committed and [21,21] committing now - these do not overlap. - * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it may create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of its columns + * + * First insert into write_set using a temporary commitID, which will be updated in a separate call, + * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}. + * This should decrease the scope of the S4U lock on the next_txn_id table. */ - try (ResultSet rs = checkForWriteConflict(stmt, txnid)) { - if (rs.next()) { - //found a conflict, so let's abort the txn - String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; - StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); - String partitionName = rs.getString(5); - if (partitionName != null) { - resource.append('/').append(partitionName); - } - String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + commitId + "]" + " due to a write conflict on " + resource + - " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); - //remove WRITE_SET info for current txn since it's about to abort - dbConn.rollback(undoWriteSetForCurrentTxn); - LOG.info(msg); - //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this - if (abortTxns(dbConn, Collections.singletonList(txnid), false, isReplayedReplTxn) != 1) { - throw new IllegalStateException(msg + " FAILED!"); + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + stmt.executeUpdate(writeSetInsertSql + (useMinHistoryLevel ? conflictSQLSuffix : + "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" <> " + OperationType.COMPACT)); + + /** + * This S4U will mutex with other commitTxn() and openTxns(). + * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial + * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start + * at the same time and no new txns start until all 3 commit. + * We could've incremented the sequence for commitId as well but it doesn't add anything functionally. + */ + acquireTxnLock(stmt, false); + commitId = getHighWaterMark(stmt); + + if (!rqst.isExclWriteEnabled()) { + /** + * see if there are any overlapping txns that wrote the same element, i.e. have a conflict + * Since entire commit operation is mutexed wrt other start/commit ops, + * committed.ws_commit_id <= current.ws_commit_id for all txns + * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap + * For example, [17,20] is committed, [6,80] is being committed right now - these overlap + * [17,20] committed and [21,21] committing now - these do not overlap. + * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + */ + try (ResultSet rs = checkForWriteConflict(stmt, txnid)) { + if (rs.next()) { + //found a conflict, so let's abort the txn + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if (partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + commitId + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if (abortTxns(dbConn, Collections.singletonList(txnid), false, isReplayedReplTxn) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + throw new TxnAbortedException(msg); } - dbConn.commit(); - throw new TxnAbortedException(msg); } } + } else if (!useMinHistoryLevel) { + stmt.executeUpdate(writeSetInsertSql + "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + + " AND \"TC_OPERATION_TYPE\" <> " + OperationType.COMPACT); + commitId = getHighWaterMark(stmt); } - } else if (txnType.get() == TxnType.COMPACTION) { - acquireTxnLock(stmt, false); - commitId = getHighWaterMark(stmt); } else { /* * current txn didn't update/delete anything (may have inserted), so just proceed with commit @@ -1575,7 +1583,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "commitTxn(" + rqst + ")"); + checkRetryable(e, "commitTxn(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1617,40 +1625,42 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * @return max Id for the conflicting transaction, if any, otherwise -1 * @throws MetaException */ + @RetrySemantics.ReadOnly public long getLatestTxnIdInConflict(long txnid) throws MetaException { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - - String writeConflictQuery = "SELECT MAX(\"COMMITTED\".\"WS_TXNID\")" + - " FROM \"WRITE_SET\" \"COMMITTED\" " + - " INNER JOIN (" + - "SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\" " + - " FROM \"TXN_COMPONENTS\" " + - " WHERE \"TC_TXNID\" = " + txnid + - " AND \"TC_OPERATION_TYPE\" IN (" + OperationType.UPDATE + "," + OperationType.DELETE + ")) \"CUR\" " + - " ON \"COMMITTED\".\"WS_DATABASE\" = \"CUR\".\"TC_DATABASE\" " + - " AND \"COMMITTED\".\"WS_TABLE\" = \"CUR\".\"TC_TABLE\" " + - //For partitioned table we always track writes at partition level (never at table) - //and for non partitioned - always at table level, thus the same table should never - //have entries with partition key and w/o - " AND (\"COMMITTED\".\"WS_PARTITION\" = \"CUR\".\"TC_PARTITION\" OR " + - " \"CUR\".\"TC_PARTITION\" IS NULL) " + - " WHERE \"CUR\".\"TC_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\""; //txns overlap - - LOG.debug("Going to execute query: <" + writeConflictQuery + ">"); - ResultSet rs = stmt.executeQuery(writeConflictQuery); - return rs.next() ? rs.getLong(1) : -1; - - } catch (Exception e) { - throw new MetaException(StringUtils.stringifyException(e)); - - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = dbConn.createStatement()) { + + String writeConflictQuery = "SELECT MAX(\"COMMITTED\".\"WS_TXNID\")" + + " FROM \"WRITE_SET\" \"COMMITTED\"" + + " INNER JOIN (" + + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\"" + + " FROM \"TXN_COMPONENTS\"" + + " WHERE \"TC_TXNID\" = " + txnid + + " AND \"TC_OPERATION_TYPE\" IN (" + OperationType.UPDATE + "," + OperationType.DELETE + ")" + + " ) \"CUR\"" + + " ON \"COMMITTED\".\"WS_DATABASE\" = \"CUR\".\"TC_DATABASE\"" + + " AND \"COMMITTED\".\"WS_TABLE\" = \"CUR\".\"TC_TABLE\"" + + (useMinHistoryLevel ? "" : + " AND \"COMMITTED\".\"WS_OPERATION_TYPE\" != " + OperationType.INSERT) + + // For partitioned table we always track writes at partition level (never at table) + // and for non partitioned - always at table level, thus the same table should never + // have entries with partition key and w/o + " AND (\"COMMITTED\".\"WS_PARTITION\" = \"CUR\".\"TC_PARTITION\" OR" + + " \"CUR\".\"TC_PARTITION\" IS NULL) " + + // txns overlap + " WHERE \"CUR\".\"TC_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\""; + + LOG.debug("Going to execute query: <" + writeConflictQuery + ">"); + try (ResultSet rs = stmt.executeQuery(writeConflictQuery)) { + return rs.next() ? rs.getLong(1) : -1; + } + } catch (SQLException e) { + checkRetryable(e, "getLatestTxnIdInConflict"); + throw new MetaException(StringUtils.stringifyException(e)); + } + } catch (RetryException e) { + return getLatestTxnIdInConflict(txnid); } } @@ -1852,7 +1862,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")", true); + checkRetryable(e, "replTableWriteIdState(" + rqst + ")", true); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1951,7 +1961,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { GetValidWriteIdsResponse owr = new GetValidWriteIdsResponse(tblValidWriteIdsList); return owr; } catch (SQLException e) { - checkRetryable(dbConn, e, "getValidWriteIds"); + checkRetryable(e, "getValidWriteIds"); throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { @@ -2263,7 +2273,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.error("Exception during write ids allocation for request={}. Will retry if possible.", rqst, e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")", true); + checkRetryable(e, "allocateTableWriteIds(" + rqst + ")", true); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -2301,7 +2311,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.error( "Exception during reading the max allocated writeId for dbName={}, tableName={}. Will retry if possible.", dbName, tableName, e); - checkRetryable(dbConn, e, "getMaxAllocatedTableWrited(" + rqst + ")"); + checkRetryable(e, "getMaxAllocatedTableWrited(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { close(rs, pStmt, dbConn); @@ -2336,7 +2346,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn.commit(); } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "seedWriteId(" + rqst + ")"); + checkRetryable(e, "seedWriteId(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { close(null, pst, dbConn); @@ -2370,7 +2380,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "seedTxnId(" + rqst + ")"); + checkRetryable(e, "seedTxnId(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); @@ -2408,7 +2418,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { retryNum = 0; throw new MetaException(e.getMessage()); } - checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")"); + checkRetryable(e, "addWriteNotificationLog(" + acidWriteEvent + ")"); throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e)); } finally{ closeDbConn(dbConn); @@ -2700,7 +2710,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, + checkRetryable(e, "heartbeatLockMaterializationRebuild(" + TableName.getDbTable(dbName, tableName) + ", " + txnId + ")"); throw new MetaException("Unable to heartbeat rebuild lock due to " + StringUtils.stringifyException(e)); @@ -2756,7 +2766,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "cleanupMaterializationRebuildLocks"); + checkRetryable(e, "cleanupMaterializationRebuildLocks"); throw new MetaException("Unable to clean rebuild locks due to " + StringUtils.stringifyException(e)); } finally { @@ -2870,7 +2880,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.error("enqueueLock failed for request: {}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); + checkRetryable(e, "enqueueLockWithRetry(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3116,7 +3126,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.error("checkLock failed for extLockId={}/txnId={}. Exception msg: {}", extLockId, txnId, getMessage(e)); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); + checkRetryable(e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3176,7 +3186,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.error("checkLock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); + checkRetryable(e, "checkLock(" + rqst + " )"); throw new MetaException("Unable to update transaction database " + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { @@ -3257,7 +3267,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.error("Unlock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "unlock(" + rqst + ")"); + checkRetryable(e, "unlock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { @@ -3372,7 +3382,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { sortedList.add(new LockInfoExt(e)); } } catch (SQLException e) { - checkRetryable(dbConn, e, "showLocks(" + rqst + ")"); + checkRetryable(e, "showLocks(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3409,7 +3419,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "heartbeat(" + ids + ")"); + checkRetryable(e, "heartbeat(" + ids + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3476,7 +3486,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")"); + checkRetryable(e, "heartbeatTxnRange(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3529,7 +3539,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } return txnId; } catch (SQLException e) { - checkRetryable(dbConn, e, "getTxnIdForWriteId"); + checkRetryable(e, "getTxnIdForWriteId"); throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { @@ -3653,7 +3663,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "COMPACT(" + rqst + ")"); + checkRetryable(e, "COMPACT(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3740,7 +3750,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { response.addToCompacts(e); } } catch (SQLException e) { - checkRetryable(dbConn, e, "showCompact(" + rqst + ")"); + checkRetryable(e, "showCompact(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3823,7 +3833,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } catch (SQLException e) { LOG.error("Unable to execute query " + e.getMessage()); - checkRetryable(dbConn, e, "getLatestCommittedCompactionInfo"); + checkRetryable(e, "getLatestCommittedCompactionInfo"); } finally { close(rs, pst, dbConn); } @@ -3875,7 +3885,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return metrics; } catch (SQLException e) { LOG.error("Unable to getMetricsInfo", e); - checkRetryable(dbConn, e, "getMetricsInfo"); + checkRetryable(e, "getMetricsInfo"); throw new MetaException("Unable to execute getMetricsInfo() " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); @@ -3955,7 +3965,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")"); + checkRetryable(e, "addDynamicPartitions(" + rqst + ")"); throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -4174,7 +4184,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: ", e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "cleanupRecords"); + checkRetryable(e, "cleanupRecords"); if (e.getMessage().contains("does not exist")) { LOG.warn("Cannot perform cleanup since metastore table does not exist"); } else { @@ -4348,7 +4358,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } catch (SQLException e) { LOG.debug("Going to rollback: " + callSig); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, callSig); + checkRetryable(e, callSig); if (e.getMessage().contains("does not exist")) { LOG.warn("Cannot perform " + callSig + " since metastore table does not exist"); } else { @@ -4487,10 +4497,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** - * See {@link #checkRetryable(Connection, SQLException, String, boolean)}. + * See {@link #checkRetryable(SQLException, String, boolean)}. */ - protected void checkRetryable(Connection conn, SQLException e, String caller) throws RetryException { - checkRetryable(conn, e, caller, false); + void checkRetryable(SQLException e, String caller) throws RetryException { + checkRetryable(e, caller, false); } /** @@ -4498,13 +4508,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * this, so we have to inspect the error messages and catch the telltale signs for each * different database. This method will throw {@code RetryException} * if the error is retry-able. - * @param conn database connection * @param e exception that was thrown. * @param caller name of the method calling this (and other info useful to log) * @param retryOnDuplicateKey whether to retry on unique key constraint violation * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried */ - protected void checkRetryable(Connection conn, SQLException e, String caller, boolean retryOnDuplicateKey) + void checkRetryable(SQLException e, String caller, boolean retryOnDuplicateKey) throws RetryException { // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() @@ -5492,7 +5501,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } catch (SQLException e) { LOG.info("Failed to update number of open transactions"); - checkRetryable(dbConn, e, "countOpenTxns()"); + checkRetryable(e, "countOpenTxns()"); } finally { close(rs, stmt, dbConn); } @@ -5515,7 +5524,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } // Need to register minimum open txnid for current transactions into MIN_HISTORY table. try (Statement stmt = dbConn.createStatement()) { - List<String> rows = txnIds.stream().map(txnId -> txnId + ", " + minOpenTxnId).collect(Collectors.toList()); // Insert transaction entries into MIN_HISTORY_LEVEL. @@ -5727,7 +5735,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //OK, so now we have a lock return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore); } catch (SQLException ex) { - checkRetryable(dbConn, ex, "acquireLock(" + key + ")"); + checkRetryable(ex, "acquireLock(" + key + ")"); throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex)); } catch(InterruptedException ex) {