HIVE-12902 Refactor TxnHandler to be an interface (gates reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1b2583ba Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1b2583ba Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1b2583ba Branch: refs/heads/master Commit: 1b2583ba87b80e04d861d4fa91f938ba0998fa4d Parents: be9735e Author: Alan Gates <ga...@hortonworks.com> Authored: Fri Jan 29 11:46:43 2016 -0800 Committer: Alan Gates <ga...@hortonworks.com> Committed: Fri Jan 29 11:46:43 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 44 +-- .../hive/ql/txn/compactor/TestCompactor.java | 25 +- .../hadoop/hive/metastore/HiveMetaStore.java | 126 ++++++- .../hive/metastore/HiveMetaStoreClient.java | 69 ++-- .../metastore/txn/CompactionTxnHandler.java | 46 +-- .../hadoop/hive/metastore/txn/TxnHandler.java | 26 +- .../hadoop/hive/metastore/txn/TxnStore.java | 349 +++++++++++++++++++ .../hadoop/hive/metastore/txn/TxnUtils.java | 97 ++++++ .../metastore/txn/TestCompactionTxnHandler.java | 31 +- .../hive/metastore/txn/TestTxnHandler.java | 195 ++++++----- .../metastore/txn/TestTxnHandlerNegative.java | 2 +- .../ql/txn/AcidCompactionHistoryService.java | 16 +- .../hive/ql/txn/AcidHouseKeeperService.java | 9 +- .../hive/ql/txn/compactor/CompactorThread.java | 11 +- .../hadoop/hive/ql/txn/compactor/Initiator.java | 12 +- .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +- .../hive/ql/lockmgr/TestDbTxnManager.java | 15 +- .../hive/ql/txn/compactor/CompactorTest.java | 38 +- .../hive/ql/txn/compactor/TestCleaner.java | 40 ++- .../hive/ql/txn/compactor/TestInitiator.java | 28 +- .../hive/ql/txn/compactor/TestWorker.java | 32 +- 22 files changed, 914 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a78f78a..f4ca4a0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,25 +18,7 @@ package org.apache.hadoop.hive.conf; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.security.auth.login.LoginException; +import com.google.common.base.Joiner; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; @@ -56,7 +38,25 @@ import org.apache.hive.common.HiveCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; +import javax.security.auth.login.LoginException; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Hive Configuration. @@ -655,6 +655,10 @@ public class HiveConf extends Configuration { METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), + METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", + "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", + "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + + "class is used to store and retrieve transactions and locks"), METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver", "Driver class name for a JDBC metastore"), METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass", http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 071a17e..568f75a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -37,10 +36,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -205,7 +204,7 @@ public class TestCompactor { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(4, compacts.size()); @@ -308,7 +307,7 @@ public class TestCompactor { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(4, compacts.size()); @@ -382,7 +381,7 @@ public class TestCompactor { execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " + tblName + " after load:"); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR); LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci)); Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf, @@ -517,7 +516,7 @@ public class TestCompactor { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(2, compacts.size()); @@ -557,7 +556,7 @@ public class TestCompactor { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(2, compacts.size()); @@ -599,7 +598,7 @@ public class TestCompactor { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); @@ -639,7 +638,7 @@ public class TestCompactor { writeBatch(connection, writer, true); // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -701,7 +700,7 @@ public class TestCompactor { writeBatch(connection, writer, true); // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -757,7 +756,7 @@ public class TestCompactor { txnBatch.abort(); // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -823,7 +822,7 @@ public class TestCompactor { // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 7830f17..dde253a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -26,11 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; - import org.apache.commons.cli.OptionBuilder; -import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,9 +40,113 @@ import org.apache.hadoop.hive.common.cli.CommonCliOptions; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; +import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; +import org.apache.hadoop.hive.metastore.api.DropPartitionsResult; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; +import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; +import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; +import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; +import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest; +import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse; +import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest; +import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MetadataPpdResult; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; +import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult; +import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.PrivilegeBag; +import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest; +import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult; +import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; +import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.TableStatsRequest; +import org.apache.hadoop.hive.metastore.api.TableStatsResult; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -80,7 +180,8 @@ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent; import org.apache.hadoop.hive.metastore.events.PreReadTableEvent; import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.shims.HadoopShims; @@ -108,9 +209,10 @@ import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jdo.JDOException; - import java.io.IOException; import java.nio.ByteBuffer; import java.text.DateFormat; @@ -229,9 +331,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { } }; - private static final ThreadLocal<TxnHandler> threadLocalTxn = new ThreadLocal<TxnHandler>() { + private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() { @Override - protected TxnHandler initialValue() { + protected TxnStore initialValue() { return null; } }; @@ -538,10 +640,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { return ms; } - private TxnHandler getTxnHandler() { - TxnHandler txn = threadLocalTxn.get(); + private TxnStore getTxnHandler() { + TxnStore txn = threadLocalTxn.get(); if (txn == null) { - txn = new TxnHandler(hiveConf); + txn = TxnUtils.getTxnStore(hiveConf); threadLocalTxn.set(txn); } return txn; http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 09a6aea..0c30262 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -18,37 +18,6 @@ package org.apache.hadoop.hive.metastore; -import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; -import static org.apache.hadoop.hive.metastore.MetaStoreUtils.isIndexTable; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.security.auth.login.LoginException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -82,9 +51,9 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest; import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult; -import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; @@ -144,7 +113,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; @@ -159,6 +128,36 @@ import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.LoginException; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.isIndexTable; /** * Hive Metastore Client. @@ -1941,12 +1940,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient { @Override public ValidTxnList getValidTxns() throws TException { - return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0); + return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0); } @Override public ValidTxnList getValidTxns(long currentTxn) throws TException { - return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn); + return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 18b288d..70cbab7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -17,30 +17,35 @@ */ package org.apache.hadoop.hive.metastore.txn; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * Extends the transaction handler with methods needed only by the compactor threads. These * methods are not available through the thrift interface. */ -public class CompactionTxnHandler extends TxnHandler { +class CompactionTxnHandler extends TxnHandler { static final private String CLASS_NAME = CompactionTxnHandler.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS // See TxnHandler for notes on how to deal with deadlocks. Follow those notes. - public CompactionTxnHandler(HiveConf conf) { - super(conf); + public CompactionTxnHandler() { } /** @@ -621,27 +626,6 @@ public class CompactionTxnHandler extends TxnHandler { } /** - * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a - * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to - * compact the files, and thus treats only open transactions as invalid. Additionally any - * txnId > highestOpenTxnId is also invalid. This is avoid creating something like - * delta_17_120 where txnId 80, for example, is still open. - * @param txns txn list from the metastore - * @return a valid txn list. - */ - public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { - long highWater = txns.getTxn_high_water_mark(); - long minOpenTxn = Long.MAX_VALUE; - long[] exceptions = new long[txns.getOpen_txnsSize()]; - int i = 0; - for (TxnInfo txn : txns.getOpen_txns()) { - if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); - exceptions[i++] = txn.getId(); - } - highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; - return new ValidCompactorTxnList(exceptions, -1, highWater); - } - /** * Record the highest txn id that the {@code ci} compaction job will pay attention to. */ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index c836f80..79c4f7a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -63,14 +63,7 @@ import java.util.concurrent.TimeUnit; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class TxnHandler { - // Compactor states (Should really be enum) - static final public String INITIATED_RESPONSE = "initiated"; - static final public String WORKING_RESPONSE = "working"; - static final public String CLEANING_RESPONSE = "ready for cleaning"; - static final public String FAILED_RESPONSE = "failed"; - static final public String SUCCEEDED_RESPONSE = "succeeded"; - static final public String ATTEMPTED_RESPONSE = "attempted"; +abstract class TxnHandler implements TxnStore { static final protected char INITIATED_STATE = 'i'; static final protected char WORKING_STATE = 'w'; @@ -97,7 +90,6 @@ public class TxnHandler { static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 10; - public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000; static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); static private DataSource connPool; @@ -109,7 +101,7 @@ public class TxnHandler { * Number of consecutive deadlocks we have seen */ private int deadlockCnt; - private final long deadlockRetryInterval; + private long deadlockRetryInterval; protected HiveConf conf; protected DatabaseProduct dbProduct; @@ -117,8 +109,8 @@ public class TxnHandler { private long timeout; private String identifierQuoteString; // quotes to use for quoting tables, where necessary - private final long retryInterval; - private final int retryLimit; + private long retryInterval; + private int retryLimit; private int retryNum; // DEADLOCK DETECTION AND HANDLING @@ -135,7 +127,10 @@ public class TxnHandler { // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. - public TxnHandler(HiveConf conf) { + public TxnHandler() { + } + + public void setConf(HiveConf conf) { this.conf = conf; checkQFileTestHack(); @@ -155,7 +150,6 @@ public class TxnHandler { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; - } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -961,7 +955,7 @@ public class TxnHandler { * For testing only, do not use. */ @VisibleForTesting - int numLocksInLockTable() throws SQLException, MetaException { + public int numLocksInLockTable() throws SQLException, MetaException { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; @@ -984,7 +978,7 @@ public class TxnHandler { /** * For testing only, do not use. */ - long setTimeout(long milliseconds) { + public long setTimeout(long milliseconds) { long previous_timeout = timeout; timeout = milliseconds; return previous_timeout; http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java new file mode 100644 index 0000000..5e0306a --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; + +import java.sql.SQLException; +import java.util.List; +import java.util.Set; + +/** + * A handler to answer transaction related calls that come into the metastore + * server. + * + * Note on log messages: Please include txnid:X and lockid info using + * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} + * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages. + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, + * so keeping the format consistent makes grep'ing the logs much easier. + * + * Note on HIVE_LOCKS.hl_last_heartbeat. + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding + * transaction in TXNS. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface TxnStore { + + // Compactor states (Should really be enum) + static final public String INITIATED_RESPONSE = "initiated"; + static final public String WORKING_RESPONSE = "working"; + static final public String CLEANING_RESPONSE = "ready for cleaning"; + static final public String FAILED_RESPONSE = "failed"; + static final public String SUCCEEDED_RESPONSE = "succeeded"; + static final public String ATTEMPTED_RESPONSE = "attempted"; + + public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000; + + public void setConf(HiveConf conf); + + /** + * Get information about open transactions. This gives extensive information about the + * transactions rather than just the list of transactions. This should be used when the need + * is to see information about the transactions (e.g. show transactions). + * @return information about open transactions + * @throws MetaException + */ + public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException; + + /** + * Get list of valid transactions. This gives just the list of transactions that are open. + * @return list of open transactions, as well as a high water mark. + * @throws MetaException + */ + public GetOpenTxnsResponse getOpenTxns() throws MetaException; + + /** + * Open a set of transactions + * @param rqst request to open transactions + * @return information on opened transactions + * @throws MetaException + */ + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException; + + /** + * Abort (rollback) a transaction. + * @param rqst info on transaction to abort + * @throws NoSuchTxnException + * @throws MetaException + */ + public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException; + + /** + * Commit a transaction + * @param rqst info on transaction to commit + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + public void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Obtain a lock. + * @param rqst information on the lock to obtain. If the requester is part of a transaction + * the txn information must be included in the lock request. + * @return info on the lock, including whether it was obtained. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + public LockResponse lock(LockRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait + * state. + * @param rqst info on the lock to check + * @return info on the state of the lock + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + public LockResponse checkLock(CheckLockRequest rqst) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case + * the txn should be committed or aborted instead. (Note someday this will change since + * multi-statement transactions will allow unlocking in the transaction.) + * @param rqst lock to unlock + * @throws NoSuchLockException + * @throws TxnOpenException + * @throws MetaException + */ + public void unlock(UnlockRequest rqst) + throws NoSuchLockException, TxnOpenException, MetaException; + + /** + * Get information on current locks. + * @param rqst lock information to retrieve + * @return lock information. + * @throws MetaException + */ + public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException; + + /** + * Send a heartbeat for a lock or a transaction + * @param ids lock and/or txn id to heartbeat + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + public void heartbeat(HeartbeatRequest ids) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Heartbeat a group of transactions together + * @param rqst set of transactions to heartbat + * @return info on txns that were heartbeated + * @throws MetaException + */ + public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException; + + /** + * Submit a compaction request into the queue. This is called when a user manually requests a + * compaction. + * @param rqst information on what to compact + * @return id of the compaction that has been started + * @throws MetaException + */ + public long compact(CompactionRequest rqst) throws MetaException; + + /** + * Show list of current compactions + * @param rqst info on which compactions to show + * @return compaction information + * @throws MetaException + */ + public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException; + + /** + * Add information on a set of dynamic partitions that participated in a transaction. + * @param rqst dynamic partition info. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + public void addDynamicPartitions(AddDynamicPartitions rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Timeout transactions and/or locks. This should only be called by the compactor. + */ + public void performTimeOuts(); + + /** + * This will look through the completed_txn_components table and look for partitions or tables + * that may be ready for compaction. Also, look through txns and txn_components tables for + * aborted transactions that we should add to the list. + * @param maxAborted Maximum number of aborted queries to allow before marking this as a + * potential compaction. + * @return list of CompactionInfo structs. These will not have id, type, + * or runAs set since these are only potential compactions not actual ones. + */ + public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException; + + /** + * Sets the user to run as. This is for the case + * where the request was generated by the user and so the worker must set this value later. + * @param cq_id id of this entry in the queue + * @param user user to run the jobs as + */ + public void setRunAs(long cq_id, String user) throws MetaException; + + /** + * This will grab the next compaction request off of + * the queue, and assign it to the worker. + * @param workerId id of the worker calling this, will be recorded in the db + * @return an info element for this compaction request, or null if there is no work to do now. + */ + public CompactionInfo findNextToCompact(String workerId) throws MetaException; + + /** + * This will mark an entry in the queue as compacted + * and put it in the ready to clean state. + * @param info info on the compaction entry to mark as compacted. + */ + public void markCompacted(CompactionInfo info) throws MetaException; + + /** + * Find entries in the queue that are ready to + * be cleaned. + * @return information on the entry in the queue. + */ + public List<CompactionInfo> findReadyToClean() throws MetaException; + + /** + * This will remove an entry from the queue after + * it has been compacted. + * + * @param info info on the compaction entry to remove + */ + public void markCleaned(CompactionInfo info) throws MetaException; + + /** + * Mark a compaction entry as failed. This will move it to the compaction history queue with a + * failed status. It will NOT clean up aborted transactions in the table/partition associated + * with this compaction. + * @param info information on the compaction that failed. + * @throws MetaException + */ + public void markFailed(CompactionInfo info) throws MetaException; + + /** + * Clean up aborted transactions from txns that have no components in txn_components. The reson such + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + */ + public void cleanEmptyAbortedTxns() throws MetaException; + + /** + * This will take all entries assigned to workers + * on a host return them to INITIATED state. The initiator should use this at start up to + * clean entries from any workers that were in the middle of compacting when the metastore + * shutdown. It does not reset entries from worker threads on other hosts as those may still + * be working. + * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, + * so that like hostname% will match the worker id. + */ + public void revokeFromLocalWorkers(String hostname) throws MetaException; + + /** + * This call will return all compaction queue + * entries assigned to a worker but over the timeout back to the initiated state. + * This should be called by the initiator on start up and occasionally when running to clean up + * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called + * first. + * @param timeout number of milliseconds since start time that should elapse before a worker is + * declared dead. + */ + public void revokeTimedoutWorkers(long timeout) throws MetaException; + + /** + * Queries metastore DB directly to find columns in the table which have statistics information. + * If {@code ci} includes partition info then per partition stats info is examined, otherwise + * table level stats are examined. + * @throws MetaException + */ + public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException; + + /** + * Record the highest txn id that the {@code ci} compaction job will pay attention to. + */ + public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException; + + /** + * For any given compactable entity (partition, table if not partitioned) the history of compactions + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the + * history such that a configurable number of each type of state is present. Any other entries + * can be purged. This scheme has advantage of always retaining the last failure/success even if + * it's not recent. + * @throws MetaException + */ + public void purgeCompactionHistory() throws MetaException; + + /** + * Determine if there are enough consecutive failures compacting a table or partition that no + * new automatic compactions should be scheduled. User initiated compactions do not do this + * check. + * @param ci Table or partition to check. + * @return true if it is ok to compact, false if there have been too many failures. + * @throws MetaException + */ + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; + + + @VisibleForTesting + public int numLocksInLockTable() throws SQLException, MetaException; + + @VisibleForTesting + long setTimeout(long milliseconds); +} http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java new file mode 100644 index 0000000..b7502c2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class TxnUtils { + private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. + * @param txns txn list from the metastore + * @param currentTxn Current transaction that the user has open. If this is greater than 0 it + * will be removed from the exceptions list so that the user sees his own + * transaction as valid. + * @return a valid txn list. + */ + public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + long highWater = txns.getTxn_high_water_mark(); + Set<Long> open = txns.getOpen_txns(); + long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; + int i = 0; + for(long txn: open) { + if (currentTxn > 0 && currentTxn == txn) continue; + exceptions[i++] = txn; + } + return new ValidReadTxnList(exceptions, highWater); + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * compact the files, and thus treats only open transactions as invalid. Additionally any + * txnId > highestOpenTxnId is also invalid. This is avoid creating something like + * delta_17_120 where txnId 80, for example, is still open. + * @param txns txn list from the metastore + * @return a valid txn list. + */ + public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { + long highWater = txns.getTxn_high_water_mark(); + long minOpenTxn = Long.MAX_VALUE; + long[] exceptions = new long[txns.getOpen_txnsSize()]; + int i = 0; + for (TxnInfo txn : txns.getOpen_txns()) { + if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); + exceptions[i++] = txn.getId(); + } + highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; + return new ValidCompactorTxnList(exceptions, -1, highWater); + } + + /** + * Get an instance of the TxnStore that is appropriate for this store + * @param conf configuration + * @return txn store + */ + public static TxnStore getTxnStore(HiveConf conf) { + String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL); + try { + TxnStore handler = ((Class<? extends TxnHandler>) MetaStoreUtils.getClass( + className)).newInstance(); + handler.setConf(conf); + return handler; + } catch (Exception e) { + LOG.error("Unable to instantiate raw store directly in fastpath mode", e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index ff2c2c1..2c1560b 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -17,19 +17,6 @@ */ package org.apache.hadoop.hive.metastore.txn; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; @@ -49,18 +36,30 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + /** * Tests for TxnHandler. */ public class TestCompactionTxnHandler { private HiveConf conf = new HiveConf(); - private CompactionTxnHandler txnHandler; + private TxnStore txnHandler; public TestCompactionTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); @@ -440,7 +439,7 @@ public class TestCompactionTxnHandler { @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); - txnHandler = new CompactionTxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); } @After http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 4debd04..f0d23ba 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -17,20 +17,6 @@ */ package org.apache.hadoop.hive.metastore.txn; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.CheckLockRequest; @@ -75,6 +61,20 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + /** * Tests for TxnHandler. */ @@ -83,7 +83,7 @@ public class TestTxnHandler { private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private HiveConf conf = new HiveConf(); - private TxnHandler txnHandler; + private TxnStore txnHandler; public TestTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); @@ -1153,99 +1153,102 @@ public class TestTxnHandler { @Ignore public void deadlockDetected() throws Exception { LOG.debug("Starting deadlock test"); - Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - Statement stmt = conn.createStatement(); - long now = txnHandler.getDbTime(conn); - stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + - "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + - "'scooby.com')"); - stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + - "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + - "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + - txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + - "'scooby.com')"); - conn.commit(); - txnHandler.closeDbConn(conn); - - final AtomicBoolean sawDeadlock = new AtomicBoolean(); - - final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - try { + if (txnHandler instanceof TxnHandler) { + final TxnHandler tHndlr = (TxnHandler)txnHandler; + Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = conn.createStatement(); + long now = tHndlr.getDbTime(conn); + stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + + "'scooby.com')"); + stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + + "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + + tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + + "'scooby.com')"); + conn.commit(); + tHndlr.closeDbConn(conn); + + final AtomicBoolean sawDeadlock = new AtomicBoolean(); + + final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + try { - for (int i = 0; i < 5; i++) { - Thread t1 = new Thread() { - @Override - public void run() { - try { + for (int i = 0; i < 5; i++) { + Thread t1 = new Thread() { + @Override + public void run() { try { - updateTxns(conn1); - updateLocks(conn1); - Thread.sleep(1000); - conn1.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { try { - txnHandler.checkRetryable(conn1, e, "thread t1"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); + updateTxns(conn1); + updateLocks(conn1); + Thread.sleep(1000); + conn1.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + tHndlr.checkRetryable(conn1, e, "thread t1"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.RetryException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.set(true); + } } + conn1.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); } - conn1.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); } - } - }; + }; - Thread t2 = new Thread() { - @Override - public void run() { - try { + Thread t2 = new Thread() { + @Override + public void run() { try { - updateLocks(conn2); - updateTxns(conn2); - Thread.sleep(1000); - conn2.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { try { - txnHandler.checkRetryable(conn2, e, "thread t2"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); + updateLocks(conn2); + updateTxns(conn2); + Thread.sleep(1000); + conn2.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + tHndlr.checkRetryable(conn2, e, "thread t2"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.RetryException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.set(true); + } } + conn2.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); } - conn2.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); } - } - }; - - t1.start(); - t2.start(); - t1.join(); - t2.join(); - if (sawDeadlock.get()) break; + }; + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + if (sawDeadlock.get()) break; + } + assertTrue(sawDeadlock.get()); + } finally { + conn1.rollback(); + tHndlr.closeDbConn(conn1); + conn2.rollback(); + tHndlr.closeDbConn(conn2); } - assertTrue(sawDeadlock.get()); - } finally { - conn1.rollback(); - txnHandler.closeDbConn(conn1); - conn2.rollback(); - txnHandler.closeDbConn(conn2); } } @@ -1262,7 +1265,7 @@ public class TestTxnHandler { @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); - txnHandler = new TxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); } @After http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java index a765f61..17bd01d 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java @@ -35,7 +35,7 @@ public class TestTxnHandlerNegative { public void testBadConnection() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah"); - TxnHandler txnHandler1 = new TxnHandler(conf); + TxnStore txnHandler1 = TxnUtils.getTxnStore(conf); MetaException e = null; try { txnHandler1.getOpenTxns(); http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java index a91ca5c..59c8fe4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java @@ -18,20 +18,12 @@ package org.apache.hadoop.hive.ql.txn; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HouseKeeperService; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -60,10 +52,10 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase { } private static final class ObsoleteEntryReaper implements Runnable { - private final CompactionTxnHandler txnHandler; + private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { - txnHandler = new CompactionTxnHandler(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); this.isAliveCounter = isAliveCounter; } http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index 96e4d40..882562b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -52,10 +53,10 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase { } private static final class TimedoutTxnReaper implements Runnable { - private final TxnHandler txnHandler; + private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { - txnHandler = new TxnHandler(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); this.isAliveCounter = isAliveCounter; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 3f6b099..8495c66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,9 +29,12 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -50,7 +51,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); protected HiveConf conf; - protected CompactionTxnHandler txnHandler; + protected TxnStore txnHandler; protected RawStore rs; protected int threadId; protected AtomicBoolean stop; @@ -75,7 +76,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { setDaemon(true); // this means the process will exit without waiting for this thread // Get our own instance of the transaction handler - txnHandler = new CompactionTxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); // Get our own connection to the database so we can get table and partition information. rs = RawStoreProxy.getProxy(conf, conf, http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 2ef06de..3705a34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,12 +34,14 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -81,7 +81,7 @@ public class Initiator extends CompactorThread { try {//todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); ValidTxnList txns = - CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); @@ -184,7 +184,7 @@ public class Initiator extends CompactorThread { CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { - if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) && + if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index ce03c8e..adffa8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; @@ -135,7 +137,7 @@ public class Worker extends CompactorThread { final boolean isMajor = ci.isMajorCompaction(); final ValidTxnList txns = - CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); LOG.debug("ValidCompactTxnList: " + txns.writeToString()); txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark()); final StringBuilder jobName = new StringBuilder(name); http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index fa576fa..5ed3c44 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -31,10 +31,10 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; @@ -486,7 +486,7 @@ public class TestTxnCommands2 { hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); AtomicBoolean stop = new AtomicBoolean(true); //create failed compactions for(int i = 0; i < numFailedCompactions; i++) { @@ -556,27 +556,27 @@ public class TestTxnCommands2 { private int working; private int total; } - private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException { + private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException { ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); CompactionsByState compactionsByState = new CompactionsByState(); compactionsByState.total = resp.getCompactsSize(); for(ShowCompactResponseElement compact : resp.getCompacts()) { - if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) { + if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) { compactionsByState.failed++; } - else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) { + else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) { compactionsByState.readyToClean++; } - else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) { + else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) { compactionsByState.initiated++; } - else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) { + else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) { compactionsByState.succeeded++; } - else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) { + else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) { compactionsByState.working++; } - else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) { + else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) { compactionsByState.attempted++; } } @@ -632,7 +632,7 @@ public class TestTxnCommands2 { runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3"); //run Worker to execute compaction - CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 88b379c..a3bf9d3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -20,9 +20,10 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; @@ -38,7 +39,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -237,10 +242,10 @@ public class TestDbTxnManager { } expireLocks(txnMgr, 5); //create a lot of locks - for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { + for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat } - expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); + expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); } private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception { DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager(); http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index a929c95..2d1ecb5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -17,18 +17,32 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreThread; -import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -39,9 +53,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.File; @@ -62,7 +82,7 @@ public abstract class CompactorTest { static final private String CLASS_NAME = CompactorTest.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - protected CompactionTxnHandler txnHandler; + protected TxnStore txnHandler; protected IMetaStoreClient ms; protected long sleepTime = 1000; protected HiveConf conf; @@ -75,7 +95,7 @@ public abstract class CompactorTest { TxnDbUtil.setConfValues(conf); TxnDbUtil.cleanDb(); ms = new HiveMetaStoreClient(conf); - txnHandler = new CompactionTxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "compactor_test_tables"); tmpdir.mkdir();