HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/259e8be1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/259e8be1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/259e8be1 Branch: refs/heads/master Commit: 259e8be1d4486c6a17b8c240e43154c5a839524e Parents: 360dfa0 Author: Wei Zheng <w...@apache.org> Authored: Fri May 20 09:50:44 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Fri May 20 09:50:44 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 + .../hadoop/hive/metastore/txn/TxnHandler.java | 79 + .../hadoop/hive/metastore/txn/TxnStore.java | 6 + .../metastore/txn/TestCompactionTxnHandler.java | 466 ------ .../hive/metastore/txn/TestTxnHandler.java | 1484 ------------------ .../hive/ql/txn/AcidOpenTxnsCounterService.java | 69 + .../metastore/txn/TestCompactionTxnHandler.java | 466 ++++++ .../hive/metastore/txn/TestTxnHandler.java | 1484 ++++++++++++++++++ .../apache/hadoop/hive/ql/TestTxnCommands2.java | 41 +- 9 files changed, 2150 insertions(+), 1951 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9cc8fbe..4cfa5f1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1681,6 +1681,12 @@ public class HiveConf extends Configuration { " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + + "current open transactions reach this limit, future open transaction requests will be \n" + + "rejected, until this number goes below the limit."), + HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."), + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + "This controls how many transactions streaming agents such as Flume or Storm open\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index abaff34..82d685d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.Warehouse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,6 +168,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + // Maximum number of open transactions that's allowed + private static volatile int maxOpenTxns = 0; + // Current number of open txns + private static volatile long numOpenTxns = 0; + // Whether number of open transactions reaches the threshold + private static volatile boolean tooManyOpenTxns = false; + // The AcidHouseKeeperService for counting open transactions + private static volatile HouseKeeperService openTxnsCounter = null; + /** * Number of consecutive deadlocks we have seen */ @@ -236,6 +246,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -362,7 +373,45 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return getOpenTxns(); } } + + private static void startHouseKeeperService(HiveConf conf, Class c){ + try { + openTxnsCounter = (HouseKeeperService)c.newInstance(); + openTxnsCounter.start(conf); + } catch (Exception ex) { + LOG.error("Failed to start {}" , openTxnsCounter.getClass() + + ". The system will not handle {} " , openTxnsCounter.getServiceDescription(), + ". Root Cause: ", ex); + } + } + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (openTxnsCounter == null) { + synchronized (TxnHandler.class) { + try { + if (openTxnsCounter == null) { + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService")); + } + } catch (ClassNotFoundException e) { + throw new MetaException(e.getMessage()); + } + } + } + + if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " + + "reached. Current number of open transactions: " + numOpenTxns); + throw new MetaException("Maximum allowed number of open transactions has been reached. " + + "See hive.max.open.txns."); + } + } + int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; @@ -2856,6 +2905,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + public void countOpenTxns() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.error("Transaction database not properly configured, " + + "can't find txn_state from TXNS."); + } else { + numOpenTxns = rs.getLong(1); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.info("Failed to update number of open transactions"); + checkRetryable(dbConn, e, "countOpenTxns()"); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + countOpenTxns(); + } + } + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 12be862..5b56aaf 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -77,6 +77,12 @@ public interface TxnStore { public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** + * Get the count for open transactions. + * @throws MetaException + */ + public void countOpenTxns() throws MetaException; + + /** * Open a set of transactions * @param rqst request to open transactions * @return information on opened transactions http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java deleted file mode 100644 index f513d0f..0000000 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ /dev/null @@ -1,466 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; - -/** - * Tests for TxnHandler. - */ -public class TestCompactionTxnHandler { - - private HiveConf conf = new HiveConf(); - private TxnStore txnHandler; - - public TestCompactionTxnHandler() throws Exception { - TxnDbUtil.setConfValues(conf); - tearDown(); - } - - @Test - public void testFindNextToCompact() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - long now = System.currentTimeMillis(); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - assertEquals("ds=today", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - assertNull(ci.runAs); - assertNull(txnHandler.findNextToCompact("fred")); - - txnHandler.setRunAs(ci.id, "bob"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("working", c.getState()); - assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now); - assertEquals("fred", c.getWorkerid()); - assertEquals("bob", c.getRunAs()); - } - - @Test - public void testFindNextToCompact2() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - - rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=yesterday"); - txnHandler.compact(rqst); - - long now = System.currentTimeMillis(); - boolean expectToday = false; - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if ("ds=today".equals(ci.partName)) expectToday = false; - else if ("ds=yesterday".equals(ci.partName)) expectToday = true; - else fail("partition name should have been today or yesterday but was " + ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if (expectToday) assertEquals("ds=today", ci.partName); - else assertEquals("ds=yesterday", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - - assertNull(txnHandler.findNextToCompact("fred")); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(2, compacts.size()); - for (ShowCompactResponseElement e : compacts) { - assertEquals("working", e.getState()); - assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now); - assertEquals("fred", e.getWorkerid()); - } - } - - @Test - public void testFindNextToCompactNothingToCompact() throws Exception { - assertNull(txnHandler.findNextToCompact("fred")); - } - - @Test - public void testMarkCompacted() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("ready for cleaning", c.getState()); - assertNull(c.getWorkerid()); - } - - @Test - public void testFindNextToClean() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean().size()); - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact("fred")); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("ready for cleaning", c.getState()); - assertNull(c.getWorkerid()); - } - - @Test - public void testMarkCleaned() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean().size()); - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact("fred")); - txnHandler.markCleaned(ci); - assertNull(txnHandler.findNextToCompact("fred")); - assertEquals(0, txnHandler.findReadyToClean().size()); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(1, rsp.getCompactsSize()); - assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test - public void testRevokeFromLocalWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR); - txnHandler.compact(rqst); - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - assertNotNull(txnHandler.findNextToCompact("bob-193892")); - assertNotNull(txnHandler.findNextToCompact("fred-193893")); - txnHandler.revokeFromLocalWorkers("fred"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(3, compacts.size()); - boolean sawWorkingBob = false; - int initiatedCount = 0; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) { - assertEquals("bob-193892", c.getWorkerid()); - sawWorkingBob = true; - } else if (c.getState().equals("initiated")) { - initiatedCount++; - } else { - fail("Unexpected state"); - } - } - assertTrue(sawWorkingBob); - assertEquals(2, initiatedCount); - } - - @Test - public void testRevokeTimedOutWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - Thread.sleep(200); - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - txnHandler.revokeTimedoutWorkers(100); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - assertEquals(2, compacts.size()); - boolean sawWorking = false, sawInitiated = false; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) sawWorking = true; - else if (c.getState().equals("initiated")) sawInitiated = true; - else fail("Unexpected state"); - } - assertTrue(sawWorking); - assertTrue(sawInitiated); - } - - @Test - public void testFindPotentialCompactions() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.UPDATE); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("yourtable"); - comp.setPartitionname("mypartition"); - comp.setOperationType(DataOperationType.UPDATE); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.commitTxn(new CommitTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - - Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100); - assertEquals(2, potentials.size()); - boolean sawMyTable = false, sawYourTable = false; - for (CompactionInfo ci : potentials) { - sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") && - ci.partName == null); - sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") && - ci.partName.equals("mypartition")); - } - assertTrue(sawMyTable); - assertTrue(sawYourTable); - } - - // TODO test changes to mark cleaned to clean txns and txn_components - - @Test - public void testMarkCleanedCleansTxnsAndTxnComponents() - throws Exception { - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setOperationType(DataOperationType.INSERT); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("yourtable"); - comp.setOperationType(DataOperationType.DELETE); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("foo"); - comp.setPartitionname("bar"); - comp.setOperationType(DataOperationType.UPDATE); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("foo"); - comp.setPartitionname("baz"); - comp.setOperationType(DataOperationType.UPDATE); - components = new ArrayList<LockComponent>(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - CompactionInfo ci = new CompactionInfo(); - - // Now clean them and check that they are removed from the count. - CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - txnHandler.markCompacted(ci); - - List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - txnHandler.markCleaned(ci); - - // Check that we are cleaning up the empty aborted transactions - GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); - assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); - txnList = txnHandler.getOpenTxns(); - assertEquals(2, txnList.getOpen_txnsSize()); - - rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); - rqst.setPartitionname("bar"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - txnHandler.markCompacted(ci); - - toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - txnHandler.markCleaned(ci); - - txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); - txnList = txnHandler.getOpenTxns(); - assertEquals(3, txnList.getOpen_txnsSize()); - } - - @Test - public void addDynamicPartitions() throws Exception { - String dbName = "default"; - String tableName = "adp_table"; - OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - long txnId = openTxns.getTxn_ids().get(0); - // lock a table, as in dynamic partitions - LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); - lc.setTablename(tableName); - DataOperationType dop = DataOperationType.UPDATE; - lc.setOperationType(dop); - LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); - lr.setTxnid(txnId); - LockResponse lock = txnHandler.lock(lr); - assertEquals(LockState.ACQUIRED, lock.getState()); - - AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName, - Arrays.asList("ds=yesterday", "ds=today")); - adp.setOperationType(dop); - txnHandler.addDynamicPartitions(adp); - txnHandler.commitTxn(new CommitTxnRequest(txnId)); - - Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000); - assertEquals(2, potentials.size()); - SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials); - - int i = 0; - for (CompactionInfo ci : sorted) { - assertEquals(dbName, ci.dbname); - assertEquals(tableName, ci.tableName); - switch (i++) { - case 0: assertEquals("ds=today", ci.partName); break; - case 1: assertEquals("ds=yesterday", ci.partName); break; - default: throw new RuntimeException("What?"); - } - } - } - - @Before - public void setUp() throws Exception { - TxnDbUtil.prepDb(); - txnHandler = TxnUtils.getTxnStore(conf); - } - - @After - public void tearDown() throws Exception { - TxnDbUtil.cleanDb(); - } - - private long openTxn() throws MetaException { - List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); - return txns.get(0); - } - -}