HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb3636f3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb3636f3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb3636f3 Branch: refs/heads/branch-1 Commit: cb3636f3fe3e45744eed23a542de05f77a3dd356 Parents: 5fe252b Author: Wei Zheng <w...@apache.org> Authored: Fri May 20 10:25:07 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Fri May 20 10:25:07 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 + .../hadoop/hive/metastore/txn/TxnHandler.java | 77 + .../hadoop/hive/metastore/txn/TxnStore.java | 6 + .../metastore/txn/TestCompactionTxnHandler.java | 447 ----- .../hive/metastore/txn/TestTxnHandler.java | 1521 ------------------ .../hive/ql/txn/AcidOpenTxnsCounterService.java | 69 + .../metastore/txn/TestCompactionTxnHandler.java | 447 +++++ .../hive/metastore/txn/TestTxnHandler.java | 1521 ++++++++++++++++++ .../apache/hadoop/hive/ql/TestTxnCommands2.java | 41 +- 9 files changed, 2166 insertions(+), 1969 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4c6aa71..c63c2ca 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1485,6 +1485,12 @@ public class HiveConf extends Configuration { " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + + "current open transactions reach this limit, future open transaction requests will be \n" + + "rejected, until this number goes below the limit."), + HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."), + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + "This controls how many transactions streaming agents such as Flume or Storm open\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 4da5542..27fa820 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.shims.ShimLoader; @@ -169,6 +170,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + // Maximum number of open transactions that's allowed + private static volatile int maxOpenTxns = 0; + // Current number of open txns + private static volatile long numOpenTxns = 0; + // Whether number of open transactions reaches the threshold + private static volatile boolean tooManyOpenTxns = false; + // The AcidHouseKeeperService for counting open transactions + private static volatile HouseKeeperService openTxnsCounter = null; + /** * Number of consecutive deadlocks we have seen */ @@ -234,6 +244,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -383,7 +394,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return new ValidReadTxnList(exceptions, highWater); } + private static void startHouseKeeperService(HiveConf conf, Class c){ + try { + openTxnsCounter = (HouseKeeperService)c.newInstance(); + openTxnsCounter.start(conf); + } catch (Exception ex) { + LOG.error("Failed to start {}" + openTxnsCounter.getClass() + + ". The system will not handle {} " + openTxnsCounter.getServiceDescription() + + ". Root Cause: ", ex); + } + } + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (openTxnsCounter == null) { + synchronized (TxnHandler.class) { + try { + if (openTxnsCounter == null) { + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService")); + } + } catch (ClassNotFoundException e) { + throw new MetaException(e.getMessage()); + } + } + } + if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " + + "reached. Current number of open transactions: " + numOpenTxns); + throw new MetaException("Maximum allowed number of open transactions has been reached. " + + "See hive.max.open.txns."); + } + } + int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; @@ -2893,6 +2940,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + public void countOpenTxns() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.error("Transaction database not properly configured, " + + "can't find txn_state from TXNS."); + } else { + numOpenTxns = rs.getLong(1); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.info("Failed to update number of open transactions"); + checkRetryable(dbConn, e, "countOpenTxns()"); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + countOpenTxns(); + } + } + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index dc807df..d739929 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -105,6 +105,12 @@ public interface TxnStore { public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** + * Get the count for open transactions. + * @throws MetaException + */ + public void countOpenTxns() throws MetaException; + + /** * Open a set of transactions * @param rqst request to open transactions * @return information on opened transactions http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java deleted file mode 100644 index 23ad54e..0000000 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ /dev/null @@ -1,447 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.*; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import static junit.framework.Assert.*; - -/** - * Tests for TxnHandler. - */ -public class TestCompactionTxnHandler { - - private HiveConf conf = new HiveConf(); - private TxnStore txnHandler; - - public TestCompactionTxnHandler() throws Exception { - TxnDbUtil.setConfValues(conf); - LogManager.getLogger(TxnHandler.class.getName()).setLevel(Level.DEBUG); - tearDown(); - } - - @Test - public void testFindNextToCompact() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - long now = System.currentTimeMillis(); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - assertEquals("ds=today", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - assertNull(ci.runAs); - assertNull(txnHandler.findNextToCompact("fred")); - - txnHandler.setRunAs(ci.id, "bob"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("working", c.getState()); - assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now); - assertEquals("fred", c.getWorkerid()); - assertEquals("bob", c.getRunAs()); - } - - @Test - public void testFindNextToCompact2() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - - rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=yesterday"); - txnHandler.compact(rqst); - - long now = System.currentTimeMillis(); - boolean expectToday = false; - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if ("ds=today".equals(ci.partName)) expectToday = false; - else if ("ds=yesterday".equals(ci.partName)) expectToday = true; - else fail("partition name should have been today or yesterday but was " + ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if (expectToday) assertEquals("ds=today", ci.partName); - else assertEquals("ds=yesterday", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - - assertNull(txnHandler.findNextToCompact("fred")); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(2, compacts.size()); - for (ShowCompactResponseElement e : compacts) { - assertEquals("working", e.getState()); - assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now); - assertEquals("fred", e.getWorkerid()); - } - } - - @Test - public void testFindNextToCompactNothingToCompact() throws Exception { - assertNull(txnHandler.findNextToCompact("fred")); - } - - @Test - public void testMarkCompacted() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("ready for cleaning", c.getState()); - assertNull(c.getWorkerid()); - } - - @Test - public void testFindNextToClean() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean().size()); - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact("fred")); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("ready for cleaning", c.getState()); - assertNull(c.getWorkerid()); - } - - @Test - public void testMarkCleaned() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean().size()); - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact("fred")); - txnHandler.markCleaned(ci); - assertNull(txnHandler.findNextToCompact("fred")); - assertEquals(0, txnHandler.findReadyToClean().size()); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(1, rsp.getCompactsSize()); - assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test - public void testRevokeFromLocalWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR); - txnHandler.compact(rqst); - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - assertNotNull(txnHandler.findNextToCompact("bob-193892")); - assertNotNull(txnHandler.findNextToCompact("fred-193893")); - txnHandler.revokeFromLocalWorkers("fred"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(3, compacts.size()); - boolean sawWorkingBob = false; - int initiatedCount = 0; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) { - assertEquals("bob-193892", c.getWorkerid()); - sawWorkingBob = true; - } else if (c.getState().equals("initiated")) { - initiatedCount++; - } else { - fail("Unexpected state"); - } - } - assertTrue(sawWorkingBob); - assertEquals(2, initiatedCount); - } - - @Test - public void testRevokeTimedOutWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - Thread.sleep(200); - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - txnHandler.revokeTimedoutWorkers(100); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(2, compacts.size()); - boolean sawWorking = false, sawInitiated = false; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) sawWorking = true; - else if (c.getState().equals("initiated")) sawInitiated = true; - else fail("Unexpected state"); - } - assertTrue(sawWorking); - assertTrue(sawInitiated); - } - - @Test - public void testFindPotentialCompactions() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.UPDATE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("yourtable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.UPDATE); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.commitTxn(new CommitTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - - Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100); - assertEquals(2, potentials.size()); - boolean sawMyTable = false, sawYourTable = false; - for (CompactionInfo ci : potentials) { - sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") && - ci.partName == null); - sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") && - ci.partName.equals("mypartition")); - } - assertTrue(sawMyTable); - assertTrue(sawYourTable); - } - - // TODO test changes to mark cleaned to clean txns and txn_components - - @Test - public void testMarkCleanedCleansTxnsAndTxnComponents() - throws Exception { - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.INSERT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("yourtable"); - comp.setOperationType(DataOperationType.DELETE); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("foo"); - comp.setPartitionname("bar"); - comp.setOperationType(DataOperationType.UPDATE); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("foo"); - comp.setPartitionname("baz"); - comp.setOperationType(DataOperationType.UPDATE); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - CompactionInfo ci = new CompactionInfo(); - - // Now clean them and check that they are removed from the count. - CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - txnHandler.markCompacted(ci); - - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - txnHandler.markCleaned(ci); - - // Check that we are cleaning up the empty aborted transactions - GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); - assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); - txnList = txnHandler.getOpenTxns(); - assertEquals(2, txnList.getOpen_txnsSize()); - - rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); - rqst.setPartitionname("bar"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - txnHandler.markCompacted(ci); - - toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - txnHandler.markCleaned(ci); - - txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); - txnList = txnHandler.getOpenTxns(); - assertEquals(3, txnList.getOpen_txnsSize()); - } - - @Test - public void addDynamicPartitions() throws Exception { - String dbName = "default"; - String tableName = "adp_table"; - OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - long txnId = openTxns.getTxn_ids().get(0); - // lock a table, as in dynamic partitions - LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); - lc.setTablename(tableName); - DataOperationType dop = DataOperationType.UPDATE; - lc.setOperationType(dop); - LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); - lr.setTxnid(txnId); - LockResponse lock = txnHandler.lock(lr); - assertEquals(LockState.ACQUIRED, lock.getState()); - - AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName, - Arrays.asList("ds=yesterday", "ds=today")); - adp.setOperationType(dop); - txnHandler.addDynamicPartitions(adp); - txnHandler.commitTxn(new CommitTxnRequest(txnId)); - - Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000); - assertEquals(2, potentials.size()); - SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials); - - int i = 0; - for (CompactionInfo ci : sorted) { - assertEquals(dbName, ci.dbname); - assertEquals(tableName, ci.tableName); - switch (i++) { - case 0: assertEquals("ds=today", ci.partName); break; - case 1: assertEquals("ds=yesterday", ci.partName); break; - default: throw new RuntimeException("What?"); - } - } - } - - @Before - public void setUp() throws Exception { - TxnDbUtil.prepDb(); - txnHandler = TxnUtils.getTxnStore(conf); - } - - @After - public void tearDown() throws Exception { - TxnDbUtil.cleanDb(); - } - - private long openTxn() throws MetaException { - List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); - return txns.get(0); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java deleted file mode 100644 index 0d4fc59..0000000 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ /dev/null @@ -1,1521 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.*; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.junit.*; -import org.apache.hadoop.util.StringUtils; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; - -/** - * Tests for TxnHandler. - */ -public class TestTxnHandler { - static final private String CLASS_NAME = TxnHandler.class.getName(); - static final private Log LOG = LogFactory.getLog(CLASS_NAME); - - private HiveConf conf = new HiveConf(); - private TxnStore txnHandler; - - public TestTxnHandler() throws Exception { - TxnDbUtil.setConfValues(conf); - LogManager.getLogger(TxnHandler.class.getName()).setLevel(Level.DEBUG); - tearDown(); - } - - @Test - public void testValidTxnsEmpty() throws Exception { - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(0L, txnsInfo.getTxn_high_water_mark()); - assertTrue(txnsInfo.getOpen_txns().isEmpty()); - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(0L, txns.getTxn_high_water_mark()); - assertTrue(txns.getOpen_txns().isEmpty()); - } - - @Test - public void testOpenTxn() throws Exception { - long first = openTxn(); - assertEquals(1L, first); - long second = openTxn(); - assertEquals(2L, second); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); - assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState()); - assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); - assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser()); - assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname()); - - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[3]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (Long tid : txns.getOpen_txns()) { - saw[tid.intValue()] = true; - } - for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); - } - - @Test - public void testAbortTxn() throws Exception { - OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); - List<Long> txnList = openedTxns.getTxn_ids(); - long first = txnList.get(0); - assertEquals(1L, first); - long second = txnList.get(1); - assertEquals(2L, second); - txnHandler.abortTxn(new AbortTxnRequest(1)); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); - assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); - assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); - assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); - - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[3]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (Long tid : txns.getOpen_txns()) { - saw[tid.intValue()] = true; - } - for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); - } - - @Test - public void testAbortInvalidTxn() throws Exception { - boolean caught = false; - try { - txnHandler.abortTxn(new AbortTxnRequest(195L)); - } catch (NoSuchTxnException e) { - caught = true; - } - assertTrue(caught); - } - - @Test - public void testValidTxnsNoneOpen() throws Exception { - txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); - txnHandler.commitTxn(new CommitTxnRequest(1)); - txnHandler.commitTxn(new CommitTxnRequest(2)); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(0, txnsInfo.getOpen_txns().size()); - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(0, txns.getOpen_txns().size()); - } - - @Test - public void testValidTxnsSomeOpen() throws Exception { - txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); - txnHandler.abortTxn(new AbortTxnRequest(1)); - txnHandler.commitTxn(new CommitTxnRequest(2)); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(3L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); - assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); - assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); - assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); - - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(3L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[4]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (Long tid : txns.getOpen_txns()) { - saw[tid.intValue()] = true; - } - assertTrue(saw[1]); - assertFalse(saw[2]); - assertTrue(saw[3]); - } - - @Test - public void testLockDifferentDBs() throws Exception { - // Test that two different databases don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSameDB() throws Exception { - // Test that two different databases don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockDbLocksTable() throws Exception { - // Test that locking a database prevents locking of tables in the database - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - comp.setTablename("mytable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockDbDoesNotLockTableInDifferentDB() throws Exception { - // Test that locking a database prevents locking of tables in the database - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); - comp.setOperationType(DataOperationType.NO_TXN); - comp.setTablename("mytable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockDifferentTables() throws Exception { - // Test that two different tables don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - comp.setTablename("mytable"); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - comp.setTablename("yourtable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSameTable() throws Exception { - // Test that two different tables don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockTableLocksPartition() throws Exception { - // Test that locking a table prevents locking of partitions of the table - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockDifferentTableDoesntLockPartition() throws Exception { - // Test that locking a table prevents locking of partitions of the table - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("yourtable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockDifferentPartitions() throws Exception { - // Test that two different partitions don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("yourpartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSamePartition() throws Exception { - // Test that two different partitions don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSRSR() throws Exception { - // Test that two shared read locks can share a partition - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.INSERT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockESRSR() throws Exception { - // Test that exclusive lock blocks shared reads - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.INSERT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSRSW() throws Exception { - // Test that write can acquire after read - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.INSERT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockESRSW() throws Exception { - // Test that exclusive lock blocks read and write - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.UPDATE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSRE() throws Exception { - // Test that read blocks exclusive - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockESRE() throws Exception { - // Test that exclusive blocks read and exclusive - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSWSR() throws Exception { - // Test that read can acquire after write - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.UPDATE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSWSWSR() throws Exception { - // Test that write blocks write but read can still acquire - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.UPDATE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.INSERT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testWrongLockForOperation() throws Exception { - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - Exception expectedError = null; - try { - LockResponse res = txnHandler.lock(req); - } - catch(Exception e) { - expectedError = e; - } - Assert.assertTrue(expectedError != null && expectedError.getMessage().contains("Unexpected DataOperationType")); - } - @Test - public void testLockSWSWSW() throws Exception { - // Test that write blocks two writes - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockEESW() throws Exception { - // Test that exclusive blocks exclusive and write - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockEESR() throws Exception { - // Test that exclusive blocks exclusive and read - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.SELECT); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testCheckLockAcquireAfterWaiting() throws Exception { - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - long txnId = openTxn(); - req.setTxnid(txnId); - LockResponse res = txnHandler.lock(req); - long lockid1 = res.getLockid(); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.UPDATE); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - long lockid2 = res.getLockid(); - assertTrue(res.getState() == LockState.WAITING); - - txnHandler.abortTxn(new AbortTxnRequest(txnId)); - res = txnHandler.checkLock(new CheckLockRequest(lockid2)); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testCheckLockNoSuchLock() throws Exception { - try { - txnHandler.checkLock(new CheckLockRequest(23L)); - fail("Allowed to check lock on non-existent lock"); - } catch (NoSuchLockException e) { - } - } - - @Test - public void testCheckLockTxnAborted() throws Exception { - // Test that when a transaction is aborted, the heartbeat fails - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - long lockid = res.getLockid(); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - try { - // This will throw NoSuchLockException (even though it's the - // transaction we've closed) because that will have deleted the lock. - txnHandler.checkLock(new CheckLockRequest(lockid)); - fail("Allowed to check lock on aborted transaction."); - } catch (NoSuchLockException e) { - } - } - - @Test - public void testMultipleLock() throws Exception { - // Test more than one lock can be handled in a lock request - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(2); - components.add(comp); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("anotherpartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - long lockid = res.getLockid(); - assertTrue(res.getState() == LockState.ACQUIRED); - res = txnHandler.checkLock(new CheckLockRequest(lockid)); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.unlock(new UnlockRequest(lockid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - } - - @Test - public void testMultipleLockWait() throws Exception { - // Test that two shared read locks can share a partition - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(2); - components.add(comp); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("anotherpartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - long lockid1 = res.getLockid(); - assertTrue(res.getState() == LockState.ACQUIRED); - - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - long lockid2 = res.getLockid(); - assertTrue(res.getState() == LockState.WAITING); - - txnHandler.unlock(new UnlockRequest(lockid1)); - - res = txnHandler.checkLock(new CheckLockRequest(lockid2)); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testUnlockOnCommit() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.DELETE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.commitTxn(new CommitTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - } - - @Test - public void testUnlockOnAbort() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.UPDATE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - } - - @Test - public void testUnlockWithTxn() throws Exception { - LOG.debug("Starting testUnlockWithTxn"); - // Test that attempting to unlock locks associated with a transaction - // generates an error - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.DELETE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - long lockid = res.getLockid(); - try { - txnHandler.unlock(new UnlockRequest(lockid)); - fail("Allowed to unlock lock associated with transaction."); - } catch (TxnOpenException e) { - } - } - - @Test - public void testHeartbeatTxnAborted() throws Exception { - // Test that when a transaction is aborted, the heartbeat fails - openTxn(); - txnHandler.abortTxn(new AbortTxnRequest(1)); - HeartbeatRequest h = new HeartbeatRequest(); - h.setTxnid(1); - try { - txnHandler.heartbeat(h); - fail("Told there was a txn, when it should have been aborted."); - } catch (TxnAbortedException e) { - } - } - - @Test - public void testHeartbeatNoTxn() throws Exception { - // Test that when a transaction is aborted, the heartbeat fails - HeartbeatRequest h = new HeartbeatRequest(); - h.setTxnid(939393L); - try { - txnHandler.heartbeat(h); - fail("Told there was a txn, when there wasn't."); - } catch (NoSuchTxnException e) { - } - } - - @Test - public void testHeartbeatLock() throws Exception { - conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); - HeartbeatRequest h = new HeartbeatRequest(); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - h.setLockid(res.getLockid()); - for (int i = 0; i < 30; i++) { - try { - txnHandler.heartbeat(h); - } catch (NoSuchLockException e) { - fail("Told there was no lock, when the heartbeat should have kept it."); - } - } - } - - @Test - public void heartbeatTxnRange() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnid = openTxn(); - txnid = openTxn(); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(0, rsp.getAborted().size()); - assertEquals(0, rsp.getNosuch().size()); - } - - @Test - public void heartbeatTxnRangeOneCommitted() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnHandler.commitTxn(new CommitTxnRequest(1)); - txnid = openTxn(); - txnid = openTxn(); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(1, rsp.getNosuchSize()); - Long txn = rsp.getNosuch().iterator().next(); - assertEquals(1L, (long)txn); - assertEquals(0, rsp.getAborted().size()); - } - - @Test - public void heartbeatTxnRangeOneAborted() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnid = openTxn(); - txnid = openTxn(); - txnHandler.abortTxn(new AbortTxnRequest(3)); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(1, rsp.getAbortedSize()); - Long txn = rsp.getAborted().iterator().next(); - assertEquals(3L, (long)txn); - assertEquals(0, rsp.getNosuch().size()); - } - - @Test - public void testLockTimeout() throws Exception { - long timeout = txnHandler.setTimeout(1); - try { - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - Thread.sleep(10); - txnHandler.performTimeOuts(); - txnHandler.checkLock(new CheckLockRequest(res.getLockid())); - fail("Told there was a lock, when it should have timed out."); - } catch (NoSuchLockException e) { - } finally { - txnHandler.setTimeout(timeout); - } - } - - @Test - public void testRecoverManyTimeouts() throws Exception { - long timeout = txnHandler.setTimeout(1); - try { - txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); - Thread.sleep(10); - txnHandler.performTimeOuts(); - GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); - int numAborted = 0; - for (TxnInfo txnInfo : rsp.getOpen_txns()) { - assertEquals(TxnState.ABORTED, txnInfo.getState()); - numAborted++; - } - assertEquals(503, numAborted); - } finally { - txnHandler.setTimeout(timeout); - } - - - } - - @Test - public void testHeartbeatNoLock() throws Exception { - HeartbeatRequest h = new HeartbeatRequest(); - h.setLockid(29389839L); - try { - txnHandler.heartbeat(h); - fail("Told there was a lock, when there wasn't."); - } catch (NoSuchLockException e) { - } - } - - @Test - public void testCompactMajorWithPartition() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MAJOR, c.getType()); - assertEquals("initiated", c.getState()); - assertEquals(0L, c.getStart()); - } - - @Test - public void testCompactMinorNoPartition() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setRunas("fred"); - txnHandler.compact(rqst); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertNull(c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("initiated", c.getState()); - assertEquals(0L, c.getStart()); - assertEquals("fred", c.getRunAs()); - } - - @Test - public void showLocks() throws Exception { - long begining = System.currentTimeMillis(); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setOperationType(DataOperationType.NO_TXN); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - // Open txn - long txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.SELECT); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - - // Locks not associated with a txn - components = new ArrayList<LockComponent>(1); - comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb"); - comp.setTablename("yourtable"); - comp.setPartitionname("yourpartition"); - comp.setOperationType(DataOperationType.INSERT); - components.add(comp); - req = new LockRequest(components, "you", "remotehost"); - res = txnHandler.lock(req); - - ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest()); - List<ShowLocksResponseElement> locks = rsp.getLocks(); - assertEquals(3, locks.size()); - boolean[] saw = new boolean[locks.size()]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (ShowLocksResponseElement lock : locks) { - if (lock.getLockid() == 1) { - assertEquals(0, lock.getTxnid()); - assertEquals("mydb", lock.getDbname()); - assertNull(lock.getTablename()); - assertNull(lock.getPartname()); - assertEquals(LockState.ACQUIRED, lock.getState()); - assertEquals(LockType.EXCLUSIVE, lock.getType()); - assertTrue(lock.toString(), 0 != lock.getLastheartbeat()); - assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining - + " and " + System.currentTimeMillis(), - begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat()); - assertEquals("me", lock.getUser()); - assertEquals("localhost", lock.getHostname()); - saw[0] = true; - } else if (lock.getLockid() == 2) { - assertEquals(1, lock.getTxnid()); - assertEquals("mydb", lock.getDbname()); - assertEquals("mytable", lock.getTablename()); - assertNull(lock.getPartname()); - assertEquals(LockState.WAITING, lock.getState()); - assertEquals(LockType.SHARED_READ, lock.getType()); - assertTrue(lock.toString(), 0 == lock.getLastheartbeat() && - lock.getTxnid() != 0); - assertEquals(0, lock.getAcquiredat()); - assertEquals("me", lock.getUser()); - assertEquals("localhost", lock.getHostname()); - saw[1] = true; - } else if (lock.getLockid() == 3) { - assertEquals(0, lock.getTxnid()); - assertEquals("yourdb", lock.getDbname()); - assertEquals("yourtable", lock.getTablename()); - assertEquals("yourpartition", lock.getPartname()); - assertEquals(LockState.ACQUIRED, lock.getState()); - assertEquals(LockType.SHARED_READ, lock.getType()); - assertTrue(lock.toString(), begining <= lock.getLastheartbeat() && - System.currentTimeMillis() >= lock.getLastheartbeat()); - assertTrue(begining <= lock.getAcquiredat() && - System.currentTimeMillis() >= lock.getAcquiredat()); - assertEquals("you", lock.getUser()); - assertEquals("remotehost", lock.getHostname()); - saw[2] = true; - } else { - fail("Unknown lock id"); - } - } - for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]); - } - - @Test - @Ignore("Wedges Derby") - public void deadlockDetected() throws Exception { - LOG.debug("Starting deadlock test"); - if (txnHandler instanceof TxnHandler) { - final TxnHandler tHndlr = (TxnHandler)txnHandler; - Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - Statement stmt = conn.createStatement(); - long now = tHndlr.getDbTime(conn); - stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + - "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + - "'scooby.com')"); - stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + - "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + - "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + - tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + - "'scooby.com')"); - conn.commit(); - tHndlr.closeDbConn(conn); - - final AtomicBoolean sawDeadlock = new AtomicBoolean(); - - final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - try { - - for (int i = 0; i < 5; i++) { - Thread t1 = new Thread() { - @Override - public void run() { - try { - try { - updateTxns(conn1); - updateLocks(conn1); - Thread.sleep(1000); - conn1.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { - try { - tHndlr.checkRetryable(conn1, e, "thread t1"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); - } - } - conn1.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - Thread t2 = new Thread() { - @Override - public void run() { - try { - try { - updateLocks(conn2); - updateTxns(conn2); - Thread.sleep(1000); - conn2.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { - try { - tHndlr.checkRetryable(conn2, e, "thread t2"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); - } - } - conn2.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - t1.start(); - t2.start(); - t1.join(); - t2.join(); - if (sawDeadlock.get()) break; - } - assertTrue(sawDeadlock.get()); - } finally { - conn1.rollback(); - tHndlr.closeDbConn(conn1); - conn2.rollback(); - tHndlr.closeDbConn(conn2); - } - } - } - - /** - * This cannnot be run against Derby (thus in UT) but it can run againt MySQL. - * 1. add to metastore/pom.xml - * <dependency> - * <groupId>mysql</groupId> - * <artifactId>mysql-connector-java</artifactId> - * <version>5.1.30</version> - * </dependency> - * 2. Hack in the c'tor of this class - * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore"); - * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive"); - * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive"); - * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); - * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack() - * - */ - @Ignore("multiple threads wedge Derby") - @Test - public void testMutexAPI() throws Exception { - final TxnStore.MutexAPI api = txnHandler.getMutexAPI(); - final AtomicInteger stepTracker = new AtomicInteger(0); - /** - * counter = 0; - * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock - * Thread2 counter=2, lock (and block), inc counter, should be 4 - */ - Thread t1 = new Thread("MutexTest1") { - public void run() { - try { - stepTracker.incrementAndGet();//now 1 - TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); - Thread.sleep(4000); - //stepTracker should now be 2 which indicates t2 has started - Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get()); - stepTracker.incrementAndGet();//now 3 - handle.releaseLocks(); - } - catch(Exception ex) { - throw new RuntimeException(ex.getMessage(), ex); - } - } - }; - t1.setDaemon(true); - ErrorHandle ueh1 = new ErrorHandle(); - t1.setUncaughtExceptionHandler(ueh1); - Thread t2 = new Thread("MutexTest2") { - public void run() { - try { - stepTracker.incrementAndGet();//now 2 - //this should block until t1 unlocks - TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); - stepTracker.incrementAndGet();//now 4 - Assert.assertEquals(4, stepTracker.get()); - handle.releaseLocks(); - stepTracker.incrementAndGet();//now 5 - } - catch(Exception ex) { - throw new RuntimeException(ex.getMessage(), ex); - } - } - }; - t2.setDaemon(true); - ErrorHandle ueh2 = new ErrorHandle(); - t2.setUncaughtExceptionHandler(ueh2); - t1.start(); - try { - Thread.sleep(1000); - } - catch(InterruptedException ex) { - LOG.info("Sleep was interrupted"); - } - t2.start(); - t1.join(6000);//so that test doesn't block - t2.join(6000); - - if(ueh1.error != null) { - Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false); - } - if (ueh2.error != null) { - Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false); - } - Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get()); - } - private final static class ErrorHandle implements Thread.UncaughtExceptionHandler { - Throwable error = null; - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage()); - error = e; - } - } - - @Test - public void testRetryableRegex() throws Exception { - SQLException sqlException = new SQLException("ORA-08177: can't serialize access for this transaction", "72000"); - // Note that we have 3 regex'es below - conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, "^Deadlock detected, roll back,.*08177.*,.*08178.*"); - boolean result = TxnHandler.isRetryable(conf, sqlException); - Assert.assertTrue("regex should be retryable", result); - - sqlException = new SQLException("This error message, has comma in it"); - conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, ".*comma.*"); - result = TxnHandler.isRetryable(conf, sqlException); - Assert.assertTrue("regex should be retryable", result); - } - - private void updateTxns(Connection conn) throws SQLException { - Statement stmt = conn.createStatement(); - stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); - } - - private void updateLocks(Connection conn) throws SQLException { - Statement stmt = conn.createStatement(); - stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1"); - } - - @Test - public void testBuildQueryWithINClause() throws Exception { - List<String> queries = new ArrayList<String>(); - - StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - - // Note, this is a "real" query that depends on one of the metastore tables - prefix.append("select count(*) from TXNS where "); - suffix.append(" and TXN_STATE = 'o'"); - - // Case 1 - Max in list members: 10; Max query string length: 1KB - // The first query happens to have 2 full batches. - conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1); - conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10); - List<Long> inList = new ArrayList<Long>(); - for (long i = 1; i <= 200; i++) { - inList.add(i); - } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); - Assert.assertEquals(1, queries.size()); - runAgainstDerby(queries); - - // Case 2 - Max in list members: 10; Max query string length: 1KB - // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member - queries.clear(); - inList.add((long)201); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); - Assert.assertEquals(2, queries.size()); - runAgainstDerby(queries); - - // Case 3 - Max in list members: 1000; Max query string length: 5KB - conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 10); - conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000); - queries.clear(); - for (long i = 202; i <= 4321; i++) { - inList.add(i); - } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); - Assert.assertEquals(3, queries.size()); - runAgainstDerby(queries); - - // Case 4 - NOT IN list - queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); - Assert.assertEquals(3, queries.size()); - runAgainstDerby(queries); - - // Case 5 - No parenthesis - queries.clear(); - suffix.setLength(0); - suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); - Assert.assertEquals(3, queries.size()); - runAgainstDerby(queries); - } - - /** Verify queries can run against Derby DB. - * As long as Derby doesn't complain, we assume the query is syntactically/semantically correct. - */ - private void runAgainstDerby(List<String> queries) throws Exception { - Connection conn = null; - Statement stmt = null; - ResultSet rs = null; - - try { - conn = TxnDbUtil.getConnection(); - stmt = conn.createStatement(); - for (String query : queries) { - rs = stmt.executeQuery(query); - Assert.assertTrue("The query is not valid", rs.next()); - } - } finally { - TxnDbUtil.closeResources(conn, stmt, rs); - } - } - - @Before - public void setUp() throws Exception { - TxnDbUtil.prepDb(); - txnHandler = TxnUtils.getTxnStore(conf); - } - - @After - public void tearDown() throws Exception { - TxnDbUtil.cleanDb(); - } - - private long openTxn() throws MetaException { - List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); - return txns.get(0); - } - -}