Repository: hive Updated Branches: refs/heads/master-txnstats 37a1907be -> 174c6748f
HIVE-20061 : add a config flag to turn off txn stats (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/174c6748 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/174c6748 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/174c6748 Branch: refs/heads/master-txnstats Commit: 174c6748fc04ded51720c55be8848f9067c46399 Parents: 37a1907 Author: sergey <ser...@apache.org> Authored: Wed Jul 18 14:39:31 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Wed Jul 18 14:39:31 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../java/org/apache/hadoop/hive/ql/Driver.java | 1 - .../hive/ql/stats/StatsUpdaterThread.java | 4 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 56 ++++++ .../hadoop/hive/metastore/ObjectStore.java | 179 +++++++++++-------- .../hive/metastore/conf/MetastoreConf.java | 2 + 6 files changed, 167 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/174c6748/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4ed1636..2073edf 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2471,6 +2471,8 @@ public class HiveConf extends Configuration { "Ensures commands with OVERWRITE (such as INSERT OVERWRITE) acquire Exclusive locks for\b" + "transactional tables. This ensures that inserts (w/o overwrite) running concurrently\n" + "are not hidden by the INSERT OVERWRITE."), + HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true, + "Whether Hive supports transactional stats (accurate stats for transactional tables)"), /** * @deprecated Use MetastoreConf.TXN_TIMEOUT */ http://git-wip-us.apache.org/repos/asf/hive/blob/174c6748/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 342dffb..4436130 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1430,7 +1430,6 @@ public class Driver implements IDriver { JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } List<String> txnTables = getTransactionalTableList(plan); - LOG.error("TODO# txnTables " + txnTables); ValidTxnWriteIdList txnWriteIds = null; if (compactionWriteIds != null) { if (txnTables.size() != 1) { http://git-wip-us.apache.org/repos/asf/hive/blob/174c6748/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index 838d277..2e4ce11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -82,7 +82,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread { // Configuration /** Whether to only update stats that already exist and are out of date. */ - private boolean isExistingOnly; + private boolean isExistingOnly, areTxnStatsEnabled; private long noUpdatesWaitMs; private int batchSize; @@ -101,6 +101,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread { } noUpdatesWaitMs = MetastoreConf.getTimeVar( conf, ConfVars.STATS_AUTO_UPDATE_NOOP_WAIT, TimeUnit.MILLISECONDS); + areTxnStatsEnabled = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); batchSize = MetastoreConf.getIntVar(conf, ConfVars.BATCH_RETRIEVE_MAX); int workerCount = MetastoreConf.getIntVar(conf, ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT); if (workerCount <= 0) { @@ -220,6 +221,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread { String writeIdString = null; boolean isTxn = AcidUtils.isTransactionalTable(table); if (isTxn) { + if (!areTxnStatsEnabled) return null; // Skip transactional tables. ValidReaderWriteIdList writeIds = getWriteIds(fullTableName); if (writeIds == null) { LOG.error("Cannot get writeIds for transactional table " + fullTableName + "; skipping"); http://git-wip-us.apache.org/repos/asf/hive/blob/174c6748/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index cd4b670..236bb7a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -29,6 +29,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,9 +37,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -56,6 +60,7 @@ import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -225,6 +230,57 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { msClient.close(); } + + @Test + public void testTxnStatsOnOff() throws Exception { + String tableName = "mm_table"; + hiveConf.setBoolean("hive.stats.autogather", true); + hiveConf.setBoolean("hive.stats.column.autogather", true); + runStatementOnDriver("drop table if exists " + tableName); + runStatementOnDriver(String.format("create table %s (a int) stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + tableName)); + + runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName)); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(1, stats.size()); + runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName)); + stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(1, stats.size()); + msClient.close(); + hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false); + msClient = new HiveMetaStoreClient(hiveConf); + // Even though the stats are valid in metastore, txn stats are disabled. + stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(0, stats.size()); + msClient.close(); + hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true); + msClient = new HiveMetaStoreClient(hiveConf); + stats = getTxnTableStats(msClient, tableName); + // Now the stats are visible again. + Assert.assertEquals(1, stats.size()); + msClient.close(); + hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false); + // Running the query with stats disabled will cause stats in metastore itself to become invalid. + runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName)); + hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true); + msClient = new HiveMetaStoreClient(hiveConf); + stats = getTxnTableStats(msClient, tableName); + Assert.assertEquals(0, stats.size()); + msClient.close(); + } + + public List<ColumnStatisticsObj> getTxnTableStats(IMetaStoreClient msClient, + String tableName) throws TException, NoSuchObjectException, MetaException { + String validWriteIds; + List<ColumnStatisticsObj> stats; + validWriteIds = msClient.getValidWriteIds("default." + tableName).toString(); + stats = msClient.getTableColumnStatistics( + "default", tableName, Lists.newArrayList("a"), -1, validWriteIds); + return stats; + } + private void assertIsDelta(FileStatus stat) { Assert.assertTrue(stat.toString(), stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)); http://git-wip-us.apache.org/repos/asf/hive/blob/174c6748/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index f25516e..9eb8424 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -192,6 +192,7 @@ public class ObjectStore implements RawStore, Configurable { private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE; private Pattern partitionValidationPattern; private Counter directSqlErrors; + private boolean areTxnStatsSupported = false; /** * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release @@ -235,6 +236,7 @@ public class ObjectStore implements RawStore, Configurable { try { isInitialized = false; this.conf = conf; + this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED); configureSSL(conf); Properties propsFromConf = getDataSourceProps(conf); boolean propsChanged = !propsFromConf.equals(prop); @@ -1434,9 +1436,11 @@ public class ObjectStore implements RawStore, Configurable { // in the metastore comply with the client query's snapshot isolation. // Note: a partitioned table has table stats and table snapshot in MPartiiton. if (writeIdList != null) { - if (tbl != null - && TxnUtils.isTransactionalTable(tbl) - && tbl.getPartitionKeysSize() == 0) { + boolean isTxn = tbl != null && TxnUtils.isTransactionalTable(tbl); + if (isTxn && !areTxnStatsSupported) { + StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from Table's parameters."); + } else if (isTxn && tbl.getPartitionKeysSize() == 0) { if (isCurrentStatsValidForTheQuery(mtable, txnId, writeIdList, false)) { tbl.setIsStatsCompliant(true); } else { @@ -2441,24 +2445,25 @@ public class ObjectStore implements RawStore, Configurable { + part_vals.toString()); } part.setValues(part_vals); - setPartitionStatsParam(part, table.getParameters(), mpart.getWriteId(), txnId, writeIdList); - return part; - } - - private void setPartitionStatsParam(Partition part, Map<String, String> tableParams, - long partWriteId, long reqTxnId, String reqWriteIdList) throws MetaException { // If transactional table partition, check whether the current version partition // statistics in the metastore comply with the client query's snapshot isolation. - if (reqWriteIdList == null) return; - if (!TxnUtils.isTransactionalTable(tableParams)) return; - if (isCurrentStatsValidForTheQuery(part, partWriteId, reqTxnId, reqWriteIdList, false)) { - part.setIsStatsCompliant(true); - } else { - part.setIsStatsCompliant(false); - // Do not make persistent the following state since it is query specific (not global). - StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); + if (TxnUtils.isTransactionalTable(table.getParameters())) { + if (!areTxnStatsSupported) { + // Do not make persistent the following state since it is query specific (not global). + StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); + } else if (writeIdList != null) { + if (isCurrentStatsValidForTheQuery(part, mpart.getWriteId(), txnId, writeIdList, false)) { + part.setIsStatsCompliant(true); + } else { + part.setIsStatsCompliant(false); + // Do not make persistent the following state since it is query specific (not global). + StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from Partition object's parameters."); + } + } } + return part; } /** @@ -4109,7 +4114,7 @@ public class ObjectStore implements RawStore, Configurable { oldt.setDatabase(newt.getDatabase()); oldt.setTableName(normalizeIdentifier(newt.getTableName())); boolean isTxn = TxnUtils.isTransactionalTable(newTable); - if (isTxn) { + if (isTxn && areTxnStatsSupported) { // Transactional table is altered without a txn. Make sure there are no changes to the flag. String errorMsg = verifyStatsChangeCtx(oldt.getParameters(), newTable.getParameters(), newTable.getWriteId(), queryValidWriteIds, false); @@ -4138,19 +4143,22 @@ public class ObjectStore implements RawStore, Configurable { oldt.setViewExpandedText(newt.getViewExpandedText()); oldt.setRewriteEnabled(newt.isRewriteEnabled()); - // If transactional, update MTable to have txnId and the writeIdList - // for the current Stats updater query. + // If transactional, update the stats state for the current Stats updater query. // Don't update for conversion to acid - it doesn't modify stats but passes in qVWIds. // The fact that it doesn't update stats is verified above. - if (isTxn && queryValidWriteIds != null && (!isToTxn || newTable.getWriteId() > 0)) { - // Check concurrent INSERT case and set false to the flag. - if (!isCurrentStatsValidForTheQuery(oldt, queryTxnId, queryValidWriteIds, true)) { + if (isTxn) { + if (!areTxnStatsSupported) { StatsSetupConst.setBasicStatsState(oldt.getParameters(), StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + - dbname + "." + name + ". will be made persistent."); + } else if (queryValidWriteIds != null && (!isToTxn || newTable.getWriteId() > 0)) { + // Check concurrent INSERT case and set false to the flag. + if (!isCurrentStatsValidForTheQuery(oldt, queryTxnId, queryValidWriteIds, true)) { + StatsSetupConst.setBasicStatsState(oldt.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + + dbname + "." + name + ". will be made persistent."); + } + assert newTable.getWriteId() > 0; + oldt.setWriteId(newTable.getWriteId()); } - assert newTable.getWriteId() > 0; - oldt.setWriteId(newTable.getWriteId()); } // commit the changes @@ -4242,7 +4250,7 @@ public class ObjectStore implements RawStore, Configurable { oldp.setValues(newp.getValues()); oldp.setPartitionName(newp.getPartitionName()); boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters()); - if (isTxn) { + if (isTxn && areTxnStatsSupported) { // Transactional table is altered without a txn. Make sure there are no changes to the flag. String errorMsg = verifyStatsChangeCtx(oldp.getParameters(), newPart.getParameters(), newPart.getWriteId(), queryValidWriteIds, false); @@ -4263,14 +4271,18 @@ public class ObjectStore implements RawStore, Configurable { // If transactional, add/update the MUPdaterTransaction // for the current updater query. - if (isTxn && queryValidWriteIds != null && newPart.getWriteId() > 0) { - // Check concurrent INSERT case and set false to the flag. - if (!isCurrentStatsValidForTheQuery(oldp, queryTxnId, queryValidWriteIds, true)) { + if (isTxn) { + if (!areTxnStatsSupported) { StatsSetupConst.setBasicStatsState(oldp.getParameters(), StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " + - dbname + "." + name + "." + oldp.getPartitionName() + " will be made persistent."); + } else if (queryValidWriteIds != null && newPart.getWriteId() > 0) { + // Check concurrent INSERT case and set false to the flag. + if (!isCurrentStatsValidForTheQuery(oldp, queryTxnId, queryValidWriteIds, true)) { + StatsSetupConst.setBasicStatsState(oldp.getParameters(), StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " + + dbname + "." + name + "." + oldp.getPartitionName() + " will be made persistent."); + } + oldp.setWriteId(newPart.getWriteId()); } - oldp.setWriteId(newPart.getWriteId()); } return oldCD; @@ -4320,7 +4332,6 @@ public class ObjectStore implements RawStore, Configurable { Set<MColumnDescriptor> oldCds = new HashSet<>(); for (Partition tmpPart: newParts) { List<String> tmpPartVals = part_val_itr.next(); - // We don't reset write ID when we invalidate stats; we unset the json boolean. if (writeId > 0) { tmpPart.setWriteId(writeId); } @@ -8436,18 +8447,22 @@ public class ObjectStore implements RawStore, Configurable { StatsSetupConst.setColumnStatsState(newParams, colNames); boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters()); if (isTxn) { - String errorMsg = verifyStatsChangeCtx( - oldt.getParameters(), newParams, writeId, validWriteIds, true); - if (errorMsg != null) { - throw new MetaException(errorMsg); - } - if (!isCurrentStatsValidForTheQuery(oldt, txnId, validWriteIds, true)) { - // Make sure we set the flag to invalid regardless of the current value. + if (!areTxnStatsSupported) { StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " - + dbname + "." + name); + } else { + String errorMsg = verifyStatsChangeCtx( + oldt.getParameters(), newParams, writeId, validWriteIds, true); + if (errorMsg != null) { + throw new MetaException(errorMsg); + } + if (!isCurrentStatsValidForTheQuery(oldt, txnId, validWriteIds, true)) { + // Make sure we set the flag to invalid regardless of the current value. + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + + dbname + "." + name); + } + oldt.setWriteId(writeId); } - oldt.setWriteId(writeId); } oldt.setParameters(newParams); @@ -8526,18 +8541,22 @@ public class ObjectStore implements RawStore, Configurable { StatsSetupConst.setColumnStatsState(newParams, colNames); boolean isTxn = TxnUtils.isTransactionalTable(table); if (isTxn) { - String errorMsg = verifyStatsChangeCtx( - mPartition.getParameters(), newParams, writeId, validWriteIds, true); - if (errorMsg != null) { - throw new MetaException(errorMsg); - } - if (!isCurrentStatsValidForTheQuery(mPartition, txnId, validWriteIds, true)) { - // Make sure we set the flag to invalid regardless of the current value. + if (!areTxnStatsSupported) { StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " - + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." + statsDesc.getPartName()); + } else { + String errorMsg = verifyStatsChangeCtx( + mPartition.getParameters(), newParams, writeId, validWriteIds, true); + if (errorMsg != null) { + throw new MetaException(errorMsg); + } + if (!isCurrentStatsValidForTheQuery(mPartition, txnId, validWriteIds, true)) { + // Make sure we set the flag to invalid regardless of the current value. + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " + + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." + statsDesc.getPartName()); + } + mPartition.setWriteId(writeId); } - mPartition.setWriteId(writeId); } mPartition.setParameters(newParams); @@ -8649,7 +8668,7 @@ public class ObjectStore implements RawStore, Configurable { if (writeIdList != null) { MTable table = this.getMTable(catName, dbName, tableName); isCompliant = !TxnUtils.isTransactionalTable(table.getParameters()) - || isCurrentStatsValidForTheQuery(table, txnId, writeIdList, false); + || (areTxnStatsSupported && isCurrentStatsValidForTheQuery(table, txnId, writeIdList, false)); } ColumnStatistics stats = getTableColumnStatisticsInternal( catName, dbName, tableName, colNames, true, true); @@ -8714,26 +8733,31 @@ public class ObjectStore implements RawStore, Configurable { long txnId, String writeIdList) throws MetaException, NoSuchObjectException { if (partNames == null && partNames.isEmpty()) { - LOG.warn("The given partNames does not have any name."); return null; } List<ColumnStatistics> allStats = getPartitionColumnStatisticsInternal( catName, dbName, tableName, partNames, colNames, true, true); if (writeIdList != null) { - // TODO## this could be improved to get partitions in bulk - for (ColumnStatistics cs : allStats) { - MPartition mpart = getMPartition(catName, dbName, tableName, - Warehouse.getPartValuesFromPartName(cs.getStatsDesc().getPartName())); - if (mpart == null - || !isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList, false)) { - if (mpart != null) { - LOG.debug("The current metastore transactional partition column statistics for {}.{}.{} " - + "(write ID {}) are not valid for current query ({} {})", dbName, tableName, - mpart.getPartitionName(), mpart.getWriteId(), txnId, writeIdList); - } + if (!areTxnStatsSupported) { + for (ColumnStatistics cs : allStats) { cs.setIsStatsCompliant(false); - } else { - cs.setIsStatsCompliant(true); + } + } else { + // TODO## this could be improved to get partitions in bulk + for (ColumnStatistics cs : allStats) { + MPartition mpart = getMPartition(catName, dbName, tableName, + Warehouse.getPartValuesFromPartName(cs.getStatsDesc().getPartName())); + if (mpart == null + || !isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList, false)) { + if (mpart != null) { + LOG.debug("The current metastore transactional partition column statistics for {}.{}.{} " + + "(write ID {}) are not valid for current query ({} {})", dbName, tableName, + mpart.getPartitionName(), mpart.getWriteId(), txnId, writeIdList); + } + cs.setIsStatsCompliant(false); + } else { + cs.setIsStatsCompliant(true); + } } } } @@ -8796,7 +8820,12 @@ public class ObjectStore implements RawStore, Configurable { // the isolation level of the query, return null. if (writeIdList != null) { if (partNames == null && partNames.isEmpty()) { - LOG.warn("The given partNames does not have any name."); + return null; + } + + MTable table = getMTable(catName, dbName, tblName); + boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters()); + if (isTxn && !areTxnStatsSupported) { return null; } @@ -8804,8 +8833,9 @@ public class ObjectStore implements RawStore, Configurable { // and no stats for partitions with invalid stats. // Loop through the given "partNames" list // checking isolation-level-compliance of each partition column stats. - for(String partName : partNames) { - MPartition mpart = getMPartition(catName, dbName, tblName, Warehouse.getPartValuesFromPartName(partName)); + for (String partName : partNames) { + MPartition mpart = getMPartition( + catName, dbName, tblName, Warehouse.getPartValuesFromPartName(partName)); if (!isCurrentStatsValidForTheQuery(mpart, txnId, writeIdList, false)) { LOG.debug("The current metastore transactional partition column statistics " + "for " + dbName + "." + tblName + "." + mpart.getPartitionName() + " is not valid " + @@ -12434,7 +12464,6 @@ public class ObjectStore implements RawStore, Configurable { LOG.info("isCurrentStatsValidForTheQuery with stats write ID {}; query {}, {}; writer: {} params {}", statsWriteId, queryTxnId, queryValidWriteIdList, isCompleteStatsWriter, statsParams); // return true since the stats does not seem to be transactional. - // stats write ID 1; query 2, default.stats_part:1:9223372036854775807::; if (statsWriteId < 1) { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/174c6748/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index c2bbba5..7b32c08 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -918,6 +918,8 @@ public class MetastoreConf { HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", "hive.support.concurrency", false, "Whether Hive supports concurrency control or not. \n" + "A ZooKeeper instance must be up and running when using zookeeper Hive lock manager "), + HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", "hive.txn.stats.enabled", true, + "Whether Hive supports transactional stats (accurate stats for transactional tables)"), // Deprecated Hive values that we are keeping for backwards compatibility. @Deprecated