Repository: hive Updated Branches: refs/heads/master ed487ac40 -> 53a590b53
HIVE-15077 - Acid LockManager is unfair (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53a590b5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53a590b5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53a590b5 Branch: refs/heads/master Commit: 53a590b5372c30369aae8c7b32895edc6026112e Parents: ed487ac Author: Eugene Koifman <ekoif...@apache.org> Authored: Fri Feb 23 13:35:06 2018 -0800 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Fri Feb 23 13:35:06 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 109 ++++++++++++++++++- .../hadoop/hive/metastore/txn/TxnHandler.java | 51 ++++----- 3 files changed, 130 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/53a590b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 31f50fa..af0884c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -246,7 +246,7 @@ public class Cleaner extends CompactorThread { * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorWriteIdList and uses for more info. * - * We only want to clean up to the highestWriteId - otherwise we risk deleteing deltas from + * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from * under an active reader. * * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a http://git-wip-us.apache.org/repos/asf/hive/blob/53a590b5/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index d411a8b..d9a5feb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -72,7 +72,7 @@ import java.util.Map; * each statement and can also simulate concurrent (but very controlled) work but w/o forking any * threads. The limitation here is that not all statements are allowed in an explicit transaction. * For example, "drop table foo". This approach will also cause the query to execute which will - * make tests slower but will exericise the code path that is much closer to the actual user calls. + * make tests slower but will exercise the code path that is much closer to the actual user calls. * * In either approach, each logical "session" should use it's own Transaction Manager. This requires * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with @@ -1467,7 +1467,7 @@ public class TestDbTxnManager2 { 3, TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } /** - * Concurrent delte/detele of same partition - should pass + * Concurrent delete/detele of same partition - should pass */ @Test public void testWriteSetTracking11() throws Exception { @@ -2230,4 +2230,109 @@ public class TestDbTxnManager2 { Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); } + @Test + public void testFairness() throws Exception { + dropTable(new String[] {"T6"}); + CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("select a from T6"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + cpr = driver.compileAndRespond("drop table if exists T6"); + checkCmdOnDriver(cpr); + //tries to get X lock on T6 and gets Waiting state + LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + List<ShowLocksResponseElement> locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); + + HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr3); + //this should block behind the X lock on T6 + //this is a contrived example, in practice this query would of course fail after drop table + cpr = driver.compileAndRespond("select a from T6"); + checkCmdOnDriver(cpr); + ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);//gets S lock on T6 + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks); + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); + } + + /** + * T7 is a table with 2 partitions + * 1. run select from T7 + * 2. run drop partition from T7 + * concurrently with 1 starting first so that 2 blocks + * 3. start another concurrent select on T7 - it should block behind waiting X (from drop) - LM should be fair + * 4. finish #1 so that drop unblocks + * 5. rollback the drop to release its X lock + * 6. # should unblock + */ + @Test + public void testFairness2() throws Exception { + dropTable(new String[]{"T7"}); + CommandProcessorResponse cpr = driver.run("create table if not exists T7 (a int) " + + "partitioned by (p int) stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run( + "insert into T7 partition(p) values(1,1),(1,2)"));//create 2 partitions + cpr = driver.compileAndRespond("select a from T7 "); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T7 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + cpr = driver.compileAndRespond("alter table T7 drop partition (p=1)"); + checkCmdOnDriver(cpr); + //tries to get X lock on T7.p=1 and gets Waiting state + LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, + "Fiddler", false); + List<ShowLocksResponseElement> locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 4, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks); + + HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr3); + //this should block behind the X lock on T7.p=1 + cpr = driver.compileAndRespond("select a from T7"); + checkCmdOnDriver(cpr); + //tries to get S lock on T7, S on T7.p=1 and S on T7.p=2 + ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 7, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks); + + txnMgr.commitTxn();//release locks from "select a from T7" - to unblock hte drop partition + //retest the the "drop partiton" X lock + lockState = ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid()); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 4, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + + txnMgr2.rollbackTxn();//release the X lock on T7.p=1 + //re-test the locks + lockState = ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid());//S lock on T7 + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + + } } http://git-wip-us.apache.org/repos/asf/hive/blob/53a590b5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index ac61715..6a74594 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2690,7 +2690,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** - * Sort more restrictive locks after less restrictive ones + * Sort more restrictive locks after less restrictive ones. Why? + * Consider insert overwirte table DB.T1 select ... from T2: + * this takes X lock on DB.T1 and S lock on T2 + * Also, create table DB.T3: takes S lock on DB. + * so the state of the lock manger is {X(T1), S(T2) S(DB)} all in acquired state. + * This is made possible by HIVE-10242. + * Now a select * from T1 will try to get S(T1) which according to the 'jumpTable' will + * be acquired once it sees S(DB). So need to check stricter locks first. */ private final static class LockTypeComparator implements Comparator<LockType> { public boolean equals(Object other) { @@ -2842,7 +2849,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller * hl_lock_ext_id by only checking earlier locks. * - * For any given SQL statment all locks required by it are grouped under single extLockId and are + * For any given SQL statement all locks required by it are grouped under single extLockId and are * granted all at once or all locks wait. * * This is expected to run at READ_COMMITTED. @@ -2871,7 +2878,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { boolean isPartOfDynamicPartitionInsert = true; try { /** - * checkLock() must be mutexed against any other checkLock to make sure 2 conflicting locks + * checkLock() must be mutex'd against any other checkLock to make sure 2 conflicting locks * are not granted by parallel checkLock() calls. */ handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name()); @@ -3007,7 +3014,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { query.append("))"); } } - query.append(" and hl_lock_ext_id <= ").append(extLockId); + query.append(" and hl_lock_ext_id < ").append(extLockId); LOG.debug("Going to execute query <" + query.toString() + ">"); stmt = dbConn.createStatement(); @@ -3027,57 +3034,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } for (LockInfo info : locksBeingChecked) { - // Find the lock record we're checking - int index = -1; - for (int i = 0; i < locks.length; i++) { - if (locks[i].equals(info)) { - index = i; - break; - } - } - - // If we didn't find the lock, then it must not be in the table - if (index == -1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("How did we get here, we heartbeated our lock before we started! ( " + info + ")"); - } - - // If we've found it and it's already been marked acquired, // then just look at the other locks. - if (locks[index].state == LockState.ACQUIRED) { + if (info.state == LockState.ACQUIRED) { /**this is what makes this method @SafeToRetry*/ continue; } // Look at everything in front of this lock to see if it should block // it or not. - for (int i = index - 1; i >= 0; i--) { + for (int i = locks.length - 1; i >= 0; i--) { // Check if we're operating on the same database, if not, move on - if (!locks[index].db.equals(locks[i].db)) { + if (!info.db.equals(locks[i].db)) { continue; } // If table is null on either of these, then they are claiming to // lock the whole database and we need to check it. Otherwise, // check if they are operating on the same table, if not, move on. - if (locks[index].table != null && locks[i].table != null - && !locks[index].table.equals(locks[i].table)) { + if (info.table != null && locks[i].table != null + && !info.table.equals(locks[i].table)) { continue; } + // if here, we may be checking a DB level lock against a Table level lock. Alternatively, + // we could have used Intention locks (for example a request for S lock on table would + // cause an IS lock DB that contains the table). Similarly, at partition level. // If partition is null on either of these, then they are claiming to // lock the whole table and we need to check it. Otherwise, // check if they are operating on the same partition, if not, move on. - if (locks[index].partition != null && locks[i].partition != null - && !locks[index].partition.equals(locks[i].partition)) { + if (info.partition != null && locks[i].partition != null + && !info.partition.equals(locks[i].partition)) { continue; } // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. - LockAction lockAction = jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state); + LockAction lockAction = jumpTable.get(info.type).get(locks[i].type).get(locks[i].state); LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); switch (lockAction) { case WAIT: