HIVE-19820 : add ACID stats support to background stats updater and fix bunch of edge cases found in SU tests (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c9947f3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c9947f3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c9947f3 Branch: refs/heads/master-txnstats Commit: 1c9947f38ba44a3c37469490669b37dae2b19b4c Parents: c8f4984 Author: sergey <ser...@apache.org> Authored: Fri Jul 13 18:53:01 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Fri Jul 13 18:53:01 2018 -0700 ---------------------------------------------------------------------- .../listener/DummyRawStoreFailEvent.java | 30 +- .../metastore/SynchronizedMetaStoreClient.java | 4 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 20 +- .../org/apache/hadoop/hive/ql/QueryPlan.java | 10 + .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 9 +- .../ql/hooks/UpdateInputAccessTimeHook.java | 6 +- .../hadoop/hive/ql/hooks/WriteEntity.java | 9 + .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 82 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 9 +- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 1 + .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 1 + .../apache/hadoop/hive/ql/metadata/Hive.java | 125 +- .../ql/metadata/SessionHiveMetaStoreClient.java | 27 + .../hive/ql/parse/BaseSemanticAnalyzer.java | 4 + .../hive/ql/parse/DDLSemanticAnalyzer.java | 77 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 133 +- .../load/message/RenamePartitionHandler.java | 2 +- .../hive/ql/plan/AlterMaterializedViewDesc.java | 27 +- .../hive/ql/plan/AlterTableAlterPartDesc.java | 30 +- .../hadoop/hive/ql/plan/AlterTableDesc.java | 30 +- .../hive/ql/plan/ColumnStatsUpdateWork.java | 22 +- .../hive/ql/plan/RenamePartitionDesc.java | 35 +- .../hadoop/hive/ql/plan/TruncateTableDesc.java | 9 +- .../hive/ql/stats/StatsUpdaterThread.java | 90 +- .../hadoop/hive/ql/TestTxnConcatenate.java | 4 +- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 22 +- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +- .../hadoop/hive/ql/metadata/TestHive.java | 6 +- .../hive/ql/stats/TestStatsUpdaterThread.java | 154 +- ql/src/test/queries/clientpositive/acid_stats.q | 3 +- .../test/queries/clientpositive/acid_stats3.q | 56 + .../test/queries/clientpositive/acid_stats4.q | 70 + .../results/clientpositive/acid_stats.q.out | 23 +- .../results/clientpositive/acid_stats3.q.out | 343 + .../results/clientpositive/acid_stats4.q.out | 496 + .../clientpositive/acid_table_stats.q.out | 4 +- ...wincompatible_vectorization_false_date.q.out | 8 - .../llap/dynpart_sort_optimization_acid.q.out | 2 +- .../materialized_view_create_rewrite_3.q.out | 4 +- .../materialized_view_create_rewrite_4.q.out | 10 +- .../materialized_view_create_rewrite_5.q.out | 8 +- ...ized_view_create_rewrite_rebuild_dummy.q.out | 4 +- ...alized_view_create_rewrite_time_window.q.out | 4 +- .../llap/results_cache_truncate.q.out | 256 +- .../clientpositive/llap/sqlmerge_stats.q.out | 63 +- .../hive/metastore/api/AbortTxnsRequest.java | 32 +- .../api/AddCheckConstraintRequest.java | 36 +- .../api/AddDefaultConstraintRequest.java | 36 +- .../metastore/api/AddDynamicPartitions.java | 32 +- .../metastore/api/AddForeignKeyRequest.java | 36 +- .../api/AddNotNullConstraintRequest.java | 36 +- .../metastore/api/AddPartitionsRequest.java | 36 +- .../hive/metastore/api/AddPartitionsResult.java | 36 +- .../metastore/api/AddPrimaryKeyRequest.java | 36 +- .../api/AddUniqueConstraintRequest.java | 36 +- .../hadoop/hive/metastore/api/AggrStats.java | 36 +- .../api/AllocateTableWriteIdsRequest.java | 68 +- .../api/AllocateTableWriteIdsResponse.java | 36 +- .../metastore/api/AlterPartitionsRequest.java | 265 +- .../hive/metastore/api/AlterTableRequest.java | 1129 + .../hive/metastore/api/AlterTableResponse.java | 283 + .../metastore/api/CheckConstraintsResponse.java | 36 +- .../metastore/api/ClearFileMetadataRequest.java | 32 +- .../hive/metastore/api/ClientCapabilities.java | 32 +- .../hive/metastore/api/ColumnStatistics.java | 261 +- .../hive/metastore/api/CommitTxnRequest.java | 36 +- .../hive/metastore/api/CompactionRequest.java | 44 +- .../hive/metastore/api/CreationMetadata.java | 32 +- .../hadoop/hive/metastore/api/Database.java | 44 +- .../api/DefaultConstraintsResponse.java | 36 +- .../metastore/api/DropPartitionsResult.java | 36 +- .../hive/metastore/api/EnvironmentContext.java | 44 +- .../metastore/api/FindSchemasByColsResp.java | 36 +- .../hive/metastore/api/FireEventRequest.java | 32 +- .../hive/metastore/api/ForeignKeysResponse.java | 36 +- .../hadoop/hive/metastore/api/Function.java | 36 +- .../metastore/api/GetAllFunctionsResponse.java | 36 +- .../hive/metastore/api/GetCatalogsResponse.java | 32 +- .../api/GetFileMetadataByExprRequest.java | 32 +- .../api/GetFileMetadataByExprResult.java | 48 +- .../metastore/api/GetFileMetadataRequest.java | 32 +- .../metastore/api/GetFileMetadataResult.java | 44 +- .../metastore/api/GetOpenTxnsInfoResponse.java | 36 +- .../hive/metastore/api/GetOpenTxnsResponse.java | 32 +- .../api/GetPrincipalsInRoleResponse.java | 36 +- .../api/GetRoleGrantsForPrincipalResponse.java | 36 +- .../hive/metastore/api/GetTablesRequest.java | 32 +- .../hive/metastore/api/GetTablesResult.java | 36 +- .../metastore/api/GetValidWriteIdsRequest.java | 32 +- .../metastore/api/GetValidWriteIdsResponse.java | 36 +- .../api/HeartbeatTxnRangeResponse.java | 64 +- .../metastore/api/InsertEventRequestData.java | 96 +- .../hadoop/hive/metastore/api/LockRequest.java | 36 +- .../hive/metastore/api/Materialization.java | 32 +- .../api/NotNullConstraintsResponse.java | 36 +- .../api/NotificationEventResponse.java | 36 +- .../hive/metastore/api/OpenTxnRequest.java | 32 +- .../hive/metastore/api/OpenTxnsResponse.java | 32 +- .../hadoop/hive/metastore/api/Partition.java | 76 +- .../api/PartitionListComposingSpec.java | 36 +- .../api/PartitionSpecWithSharedSD.java | 36 +- .../metastore/api/PartitionValuesRequest.java | 72 +- .../metastore/api/PartitionValuesResponse.java | 36 +- .../hive/metastore/api/PartitionValuesRow.java | 32 +- .../hive/metastore/api/PartitionWithoutSD.java | 76 +- .../metastore/api/PartitionsByExprResult.java | 36 +- .../metastore/api/PartitionsStatsRequest.java | 64 +- .../metastore/api/PartitionsStatsResult.java | 76 +- .../hive/metastore/api/PrimaryKeysResponse.java | 36 +- .../metastore/api/PutFileMetadataRequest.java | 64 +- .../api/ReplTblWriteIdStateRequest.java | 32 +- .../hive/metastore/api/RequestPartsSpec.java | 68 +- .../hadoop/hive/metastore/api/Schema.java | 80 +- .../hive/metastore/api/SchemaVersion.java | 36 +- .../hadoop/hive/metastore/api/SerDeInfo.java | 44 +- .../api/SetPartitionsStatsRequest.java | 36 +- .../api/SetPartitionsStatsResponse.java | 387 + .../hive/metastore/api/ShowCompactResponse.java | 36 +- .../hive/metastore/api/ShowLocksResponse.java | 36 +- .../hadoop/hive/metastore/api/SkewedInfo.java | 164 +- .../hive/metastore/api/StorageDescriptor.java | 148 +- .../apache/hadoop/hive/metastore/api/Table.java | 80 +- .../hive/metastore/api/TableStatsRequest.java | 32 +- .../hive/metastore/api/TableStatsResult.java | 36 +- .../hive/metastore/api/TableValidWriteIds.java | 32 +- .../hive/metastore/api/ThriftHiveMetastore.java | 26382 ++++++++++------- .../metastore/api/TruncateTableRequest.java | 961 + .../metastore/api/TruncateTableResponse.java | 283 + .../api/UniqueConstraintsResponse.java | 36 +- .../hive/metastore/api/WMFullResourcePlan.java | 144 +- .../api/WMGetAllResourcePlanResponse.java | 36 +- .../WMGetTriggersForResourePlanResponse.java | 36 +- .../api/WMValidateResourcePlanResponse.java | 64 +- .../api/WriteNotificationLogRequest.java | 32 +- .../gen-php/metastore/ThriftHiveMetastore.php | 3335 ++- .../src/gen/thrift/gen-php/metastore/Types.php | 2920 +- .../hive_metastore/ThriftHiveMetastore-remote | 36 +- .../hive_metastore/ThriftHiveMetastore.py | 3201 +- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 2004 +- .../gen/thrift/gen-rb/hive_metastore_types.rb | 133 +- .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 309 +- .../hadoop/hive/common/StatsSetupConst.java | 9 +- .../hadoop/hive/metastore/AlterHandler.java | 7 +- .../hadoop/hive/metastore/HiveAlterHandler.java | 144 +- .../hadoop/hive/metastore/HiveMetaStore.java | 492 +- .../hive/metastore/HiveMetaStoreClient.java | 96 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 18 +- .../hive/metastore/MetaStoreDirectSql.java | 26 +- .../hadoop/hive/metastore/ObjectStore.java | 281 +- .../apache/hadoop/hive/metastore/RawStore.java | 5 +- .../hive/metastore/cache/CachedStore.java | 10 +- .../src/main/thrift/hive_metastore.thrift | 64 +- .../DummyRawStoreControlledCommit.java | 8 +- .../DummyRawStoreForJdoConnection.java | 6 +- .../HiveMetaStoreClientPreCatalog.java | 30 +- .../hive/metastore/TestHiveAlterHandler.java | 6 +- .../hive/metastore/TestHiveMetaStore.java | 43 +- .../metastore/TestMetaStoreEventListener.java | 1 + .../hadoop/hive/metastore/TestObjectStore.java | 2 +- .../hadoop/hive/metastore/TestOldSchema.java | 2 +- .../hive/metastore/cache/TestCachedStore.java | 14 +- .../metastore/client/TestAlterPartitions.java | 48 +- .../metastore/client/TestAppendPartitions.java | 6 + .../TestTablesCreateDropAlterTruncate.java | 31 +- 164 files changed, 31123 insertions(+), 18165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index cd036e6..270aa6c 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -399,7 +399,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { List<List<String>> partValsList, List<Partition> newParts, long writeId, long queryTxnId, String queryValidWriteIds) throws InvalidObjectException, MetaException { - objectStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, writeId, queryTxnId, queryValidWriteIds); + if (shouldEventSucceed) { + objectStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, writeId, queryTxnId, queryValidWriteIds); + } else { + throw new RuntimeException("Event failed."); + } } @Override @@ -422,8 +426,10 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { @Override public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName, - List<String> partNames) throws MetaException, NoSuchObjectException { - return objectStore.getPartitionsByNames(catName, dbName, tblName, partNames); + List<String> partNames) + throws MetaException, NoSuchObjectException { + return objectStore.getPartitionsByNames( + catName, dbName, tblName, partNames); } @Override @@ -730,18 +736,16 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override - public boolean updateTableColumnStatistics(ColumnStatistics statsObj) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { - return objectStore.updateTableColumnStatistics(statsObj); + public boolean updateTableColumnStatistics(ColumnStatistics statsObj, long txnId, String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + return objectStore.updateTableColumnStatistics(statsObj, txnId, validWriteIds, writeId); } @Override public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, - List<String> partVals) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { - return objectStore.updatePartitionColumnStatistics(statsObj, partVals); + List<String> partVals, long txnId, String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + return objectStore.updatePartitionColumnStatistics(statsObj, partVals, txnId, validWriteIds, writeId); } @Override @@ -1305,4 +1309,6 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { String dbName, String tableName) throws MetaException, NoSuchObjectException { return null; - }} + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index 2ba6d07..7eddc16 100644 --- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -79,8 +79,8 @@ public final class SynchronizedMetaStoreClient { } public synchronized void alter_partition(String dbName, String tblName, - Partition newPart, EnvironmentContext environmentContext) throws TException { - client.alter_partition(dbName, tblName, newPart, environmentContext); + Partition newPart, EnvironmentContext environmentContext, long txnId, String writeIdList) throws TException { + client.alter_partition(dbName, tblName, newPart, environmentContext, txnId, writeIdList); } public synchronized LockResponse checkLock(long lockid) throws TException { http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 78922f1..342dffb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1423,13 +1423,14 @@ public class Driver implements IDriver { // Write the current set of valid write ids for the operated acid tables into the conf file so // that it can be read by the input format. - private void recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { + private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); if ((txnString == null) || (txnString.isEmpty())) { throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } List<String> txnTables = getTransactionalTableList(plan); + LOG.error("TODO# txnTables " + txnTables); ValidTxnWriteIdList txnWriteIds = null; if (compactionWriteIds != null) { if (txnTables.size() != 1) { @@ -1466,6 +1467,7 @@ public class Driver implements IDriver { } } LOG.debug("Encoding valid txn write ids info " + writeIdStr + " txnid:" + txnMgr.getCurrentTxnId()); + return txnWriteIds; } // Make the list of transactional tables list which are getting read or written by current txn @@ -1602,10 +1604,16 @@ public class Driver implements IDriver { } } - // Note: the sinks and DDL cannot coexist at this time; but if they could we would - // need to make sure we don't get two write IDs for the same table. + if (plan.getAcidAnalyzeTable() != null) { + // Allocate write ID for the table being analyzed. + Table t = plan.getAcidAnalyzeTable().getTable(); + queryTxnMgr.getTableWriteId(t.getDbName(), t.getTableName()); + } + + DDLDescWithWriteId acidDdlDesc = plan.getAcidDdlDesc(); - if (acidDdlDesc != null && acidDdlDesc.mayNeedWriteId()) { + boolean hasAcidDdl = acidDdlDesc != null && acidDdlDesc.mayNeedWriteId(); + if (hasAcidDdl) { String fqTableName = acidDdlDesc.getFullTableName(); long writeId = queryTxnMgr.getTableWriteId( Utilities.getDatabaseName(fqTableName), Utilities.getTableName(fqTableName)); @@ -1620,9 +1628,11 @@ public class Driver implements IDriver { throw new IllegalStateException("calling recordValidTxn() more than once in the same " + JavaUtils.txnIdToString(queryTxnMgr.getCurrentTxnId())); } - if (plan.hasAcidResourcesInQuery()) { + + if (plan.hasAcidResourcesInQuery() || hasAcidDdl) { recordValidWriteIds(queryTxnMgr); } + } catch (Exception e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 79e938a..f2201dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -112,6 +113,7 @@ public class QueryPlan implements Serializable { private final HiveOperation operation; private final boolean acidResourcesInQuery; private final Set<FileSinkDesc> acidSinks; // Note: both full-ACID and insert-only sinks. + private final WriteEntity acidAnalyzeTable; private final DDLDesc.DDLDescWithWriteId acidDdlDesc; private Boolean autoCommitValue; @@ -125,6 +127,7 @@ public class QueryPlan implements Serializable { this.acidResourcesInQuery = false; this.acidSinks = Collections.emptySet(); this.acidDdlDesc = null; + this.acidAnalyzeTable = null; } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, @@ -151,9 +154,11 @@ public class QueryPlan implements Serializable { this.operation = operation; this.autoCommitValue = sem.getAutoCommitValue(); this.resultSchema = resultSchema; + // TODO: all this ACID stuff should be in some sub-object this.acidResourcesInQuery = sem.hasTransactionalInQuery(); this.acidSinks = sem.getAcidFileSinks(); this.acidDdlDesc = sem.getAcidDdlDesc(); + this.acidAnalyzeTable = sem.getAcidAnalyzeTable(); } /** @@ -162,6 +167,11 @@ public class QueryPlan implements Serializable { public boolean hasAcidResourcesInQuery() { return acidResourcesInQuery; } + + public WriteEntity getAcidAnalyzeTable() { + return acidAnalyzeTable; + } + /** * @return Collection of FileSinkDesc representing writes to Acid resources */ http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index d912d4d..397cee2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3933,14 +3933,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable { environmentContext = new EnvironmentContext(); } environmentContext.putToProperties(HiveMetaHook.ALTER_TABLE_OPERATION_TYPE, alterTbl.getOp().name()); - // Note: in the old default overloads that I've removed, "transactional" was true for tables, - // but false for partitions. Seems to be ok here because we are not updating - // partition-stats-related stuff from this call (alterTable). if (allPartitions == null) { db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), environmentContext, true); } else { - db.alterPartitions( - Warehouse.getQualifiedName(tbl.getTTable()), allPartitions, environmentContext, false); + // Note: this is necessary for UPDATE_STATISTICS command, that operates via ADDPROPS (why?). + // For any other updates, we don't want to do txn check on partitions when altering table. + boolean isTxn = alterTbl.getPartSpec() != null && alterTbl.getOp() == AlterTableTypes.ADDPROPS; + db.alterPartitions(Warehouse.getQualifiedName(tbl.getTTable()), allPartitions, environmentContext, isTxn); } // Add constraints if necessary addConstraints(db, alterTbl); http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java index 4cf7c25..ea0b2c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java @@ -63,7 +63,7 @@ public class UpdateInputAccessTimeHook { String tblName = re.getTable().getTableName(); Table t = db.getTable(dbName, tblName); t.setLastAccessTime(lastAccessTime); - db.alterTable(dbName + "." + tblName, t, false, null, true); + db.alterTable(dbName + "." + tblName, t, false, null, false); break; } case PARTITION: { @@ -73,9 +73,9 @@ public class UpdateInputAccessTimeHook { Table t = db.getTable(dbName, tblName); p = db.getPartition(t, p.getSpec(), false); p.setLastAccessTime(lastAccessTime); - db.alterPartition(dbName, tblName, p, null, true); + db.alterPartition(dbName, tblName, p, null, false); t.setLastAccessTime(lastAccessTime); - db.alterTable(dbName + "." + tblName, t, false, null, true); + db.alterTable(dbName + "." + tblName, t, false, null, false); break; } default: http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index f1cf113..3afa201 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -39,6 +39,7 @@ public class WriteEntity extends Entity implements Serializable { private boolean isTempURI = false; private transient boolean isDynamicPartitionWrite = false; + private transient boolean isTxnAnalyze = false; public static enum WriteType { DDL_EXCLUSIVE, // for use in DDL statements that require an exclusive lock, @@ -223,6 +224,7 @@ public class WriteEntity extends Entity implements Serializable { case ADDPARTITION: case ADDSERDEPROPS: case ADDPROPS: + case UPDATESTATS: return WriteType.DDL_SHARED; case COMPACT: @@ -242,4 +244,11 @@ public class WriteEntity extends Entity implements Serializable { return toString() + " Type=" + getTyp() + " WriteType=" + getWriteType() + " isDP=" + isDynamicPartitionWrite(); } + public boolean isTxnAnalyze() { + return isTxnAnalyze; + } + + public void setTxnAnalyze(boolean isTxnAnalyze) { + this.isTxnAnalyze = isTxnAnalyze; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index e54afc4..f356682 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -72,6 +72,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; + import java.nio.charset.Charset; /** @@ -1648,7 +1649,7 @@ public class AcidUtils { @Override public String toString() { - return "[txnId=" + txnId + ", validWriteIdList=" + validWriteIdList + "]"; + return "[txnId=" + txnId + ", validWriteIdList=" + validWriteIdList + ", writeId=" + writeId + "]"; } } @@ -1661,49 +1662,60 @@ public class AcidUtils { public static TableSnapshot getTableSnapshot( Configuration conf, Table tbl, boolean isStatsUpdater) throws LockException { + return getTableSnapshot(conf, tbl, tbl.getDbName(), tbl.getTableName(), isStatsUpdater); + } + + public static TableSnapshot getTableSnapshot(Configuration conf, + Table tbl, String dbName, String tblName, boolean isStatsUpdater) + throws LockException, AssertionError { if (!isTransactionalTable(tbl)) { return null; - } else { - long txnId = -1; - long writeId = -1; - ValidWriteIdList validWriteIdList = null; + } + if (dbName == null) { + dbName = tbl.getDbName(); + } + if (tblName == null) { + tblName = tbl.getTableName(); + } + long txnId = -1; + long writeId = -1; + ValidWriteIdList validWriteIdList = null; - HiveTxnManager sessionTxnMgr = SessionState.get().getTxnMgr(); + HiveTxnManager sessionTxnMgr = SessionState.get().getTxnMgr(); - if (sessionTxnMgr != null) { - txnId = sessionTxnMgr.getCurrentTxnId(); - } - String fullTableName = getFullTableName(tbl.getDbName(), tbl.getTableName()); - if (txnId > 0 && isTransactionalTable(tbl)) { - validWriteIdList = getTableValidWriteIdList(conf, fullTableName); - if (isStatsUpdater) { - writeId = SessionState.get().getTxnMgr() != null ? - SessionState.get().getTxnMgr().getAllocatedTableWriteId( - tbl.getDbName(), tbl.getTableName()) : -1; - if (writeId < 1) { - // TODO: this is not ideal... stats updater that doesn't have write ID is currently - // "create table"; writeId would be 0/-1 here. No need to call this w/true. - LOG.debug("Stats updater for {}.{} doesn't have a write ID", - tbl.getDbName(), tbl.getTableName()); - } + if (sessionTxnMgr != null) { + txnId = sessionTxnMgr.getCurrentTxnId(); + } + String fullTableName = getFullTableName(dbName, tblName); + if (txnId > 0) { + validWriteIdList = getTableValidWriteIdList(conf, fullTableName); + if (isStatsUpdater) { + writeId = SessionState.get().getTxnMgr() != null ? + SessionState.get().getTxnMgr().getAllocatedTableWriteId( + dbName, tblName) : -1; + if (writeId < 1) { + // TODO: this is not ideal... stats updater that doesn't have write ID is currently + // "create table"; writeId would be 0/-1 here. No need to call this w/true. + LOG.debug("Stats updater for {}.{} doesn't have a write ID ({})", + dbName, tblName, writeId); } + } - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) - && conf.get(ValidTxnList.VALID_TXNS_KEY) == null) { - return null; - } - if (validWriteIdList == null) { - validWriteIdList = getTableValidWriteIdListWithTxnList( - conf, tbl.getDbName(), tbl.getTableName()); - } - if (validWriteIdList == null) { - throw new AssertionError("Cannot find valid write ID list for " + tbl.getTableName()); - } + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) + && conf.get(ValidTxnList.VALID_TXNS_KEY) == null) { + return null; + } + if (validWriteIdList == null) { + validWriteIdList = getTableValidWriteIdListWithTxnList( + conf, dbName, tblName); + } + if (validWriteIdList == null) { + throw new AssertionError("Cannot find valid write ID list for " + tblName); } - return new TableSnapshot(txnId, writeId, - validWriteIdList != null ? validWriteIdList.toString() : null); } + return new TableSnapshot(txnId, writeId, + validWriteIdList != null ? validWriteIdList.toString() : null); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index a05ae0c..d3eefb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hive.ql.lockmgr; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; @@ -569,7 +571,12 @@ public final class DbTxnManager extends HiveTxnManagerImpl { break; case DDL_SHARED: compBuilder.setShared(); - compBuilder.setOperationType(DataOperationType.NO_TXN); + if (!output.isTxnAnalyze()) { + // Analyze needs txn components to be present, otherwise an aborted analyze write ID + // might be rolled under the watermark by compactor while stats written by it are + // still present. + compBuilder.setOperationType(DataOperationType.NO_TXN); + } break; case UPDATE: http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 03f2ff3..17a2d20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 6a01abc..ba1f1ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 07fe43f..7a1160d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.TableSnapshot; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -584,7 +585,7 @@ public class Hive { public void alterTable(Table newTbl, boolean cascade, EnvironmentContext environmentContext, boolean transactional) throws HiveException { - alterTable(newTbl.getDbName(), + alterTable(newTbl.getCatName(), newTbl.getDbName(), newTbl.getTableName(), newTbl, cascade, environmentContext, transactional); } @@ -605,20 +606,23 @@ public class Hive { boolean transactional) throws HiveException { String[] names = Utilities.getDbTableName(fullyQlfdTblName); - alterTable(names[0], names[1], newTbl, false, environmentContext, transactional); + alterTable(null, names[0], names[1], newTbl, false, environmentContext, transactional); } public void alterTable(String fullyQlfdTblName, Table newTbl, boolean cascade, EnvironmentContext environmentContext, boolean transactional) throws HiveException { String[] names = Utilities.getDbTableName(fullyQlfdTblName); - alterTable(names[0], names[1], newTbl, cascade, environmentContext, transactional); + alterTable(null, names[0], names[1], newTbl, cascade, environmentContext, transactional); } - public void alterTable(String dbName, String tblName, Table newTbl, boolean cascade, + public void alterTable(String catName, String dbName, String tblName, Table newTbl, boolean cascade, EnvironmentContext environmentContext, boolean transactional) throws HiveException { + if (catName == null) { + catName = getDefaultCatalog(conf); + } try { // Remove the DDL_TIME so it gets refreshed if (newTbl.getParameters() != null) { @@ -633,12 +637,22 @@ public class Hive { } // Take a table snapshot and set it to newTbl. + AcidUtils.TableSnapshot tableSnapshot = null; if (transactional) { - setTableSnapshotForTransactionalTable(environmentContext, conf, newTbl, true); + // Make sure we pass in the names, so we can get the correct snapshot for rename table. + tableSnapshot = AcidUtils.getTableSnapshot(conf, newTbl, dbName, tblName, true); + if (tableSnapshot != null) { + newTbl.getTTable().setWriteId(tableSnapshot.getWriteId()); + } else { + LOG.warn("Cannot get a table snapshot for " + tblName); + } } - getMSC().alter_table_with_environmentContext( - dbName, tblName, newTbl.getTTable(), environmentContext); + // Why is alter_partitions synchronized while this isn't? + getMSC().alter_table( + catName, dbName, tblName, newTbl.getTTable(), environmentContext, + tableSnapshot == null ? -1 : tableSnapshot.getTxnId(), + tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } catch (MetaException e) { throw new HiveException("Unable to alter table. " + e.getMessage(), e); } catch (TException e) { @@ -703,11 +717,19 @@ public class Hive { if (environmentContext == null) { environmentContext = new EnvironmentContext(); } + AcidUtils.TableSnapshot tableSnapshot = null; if (transactional) { - setTableSnapshotForTransactionalPartition(environmentContext, conf, newPart, true); + tableSnapshot = AcidUtils.getTableSnapshot(conf, newPart.getTable(), true); + if (tableSnapshot != null) { + newPart.getTPartition().setWriteId(tableSnapshot.getWriteId()); + } else { + LOG.warn("Cannot get a table snapshot for " + tblName); + } } getSynchronizedMSC().alter_partition( - dbName, tblName, newPart.getTPartition(), environmentContext); + dbName, tblName, newPart.getTPartition(), environmentContext, + tableSnapshot == null ? -1 : tableSnapshot.getTxnId(), + tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } catch (MetaException e) { throw new HiveException("Unable to alter partition. " + e.getMessage(), e); @@ -895,7 +917,11 @@ public class Hive { } } // Set table snapshot to api.Table to make it persistent. - setTableSnapshotForTransactionalTable(null, conf, tbl, true); + TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true); + if (tableSnapshot != null) { + tbl.getTTable().setWriteId(tableSnapshot.getWriteId()); + } + if (primaryKeys == null && foreignKeys == null && uniqueConstraints == null && notNullConstraints == null && defaultConstraints == null && checkConstraints == null) { @@ -1028,10 +1054,20 @@ public class Hive { public void truncateTable(String dbDotTableName, Map<String, String> partSpec) throws HiveException { try { Table table = getTable(dbDotTableName, true); + // TODO: we should refactor code to make sure snapshot is always obtained in the same layer e.g. Hive.java + AcidUtils.TableSnapshot snapshot = null; + if (AcidUtils.isTransactionalTable(table)) { + snapshot = AcidUtils.getTableSnapshot(conf, table, true); + } List<String> partNames = ((null == partSpec) - ? null : getPartitionNames(table.getDbName(), table.getTableName(), partSpec, (short) -1)); - getMSC().truncateTable(table.getDbName(), table.getTableName(), partNames); + ? null : getPartitionNames(table.getDbName(), table.getTableName(), partSpec, (short) -1)); + if (snapshot == null) { + getMSC().truncateTable(table.getDbName(), table.getTableName(), partNames); + } else { + getMSC().truncateTable(table.getDbName(), table.getTableName(), partNames, + snapshot.getTxnId(), snapshot.getValidWriteIdList(), snapshot.getWriteId()); + } } catch (Exception e) { throw new HiveException(e); } @@ -1683,7 +1719,7 @@ public class Hive { * true if there is a following task which updates the stats, so, this method need not update. * @param writeId write ID allocated for the current load operation * @param stmtId statement ID of the current load statement - * @param isInsertOverwrite + * @param isInsertOverwrite * @return Partition object being loaded with data */ public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, @@ -1736,7 +1772,7 @@ public class Hive { List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>()); perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); - + // If config is set, table is not temporary and partition being inserted exists, capture // the list of files added. For not yet existing partitions (insert overwrite to new partition // or dynamic partition inserts), the add partition event will capture the list of files added. @@ -1799,8 +1835,11 @@ public class Hive { Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); - EnvironmentContext ec = new EnvironmentContext(); - setTableSnapshotForTransactionalPartition(ec, conf, newTPart, true); + AcidUtils.TableSnapshot tableSnapshot = null; + tableSnapshot = AcidUtils.getTableSnapshot(conf, newTPart.getTable(), true); + if (tableSnapshot != null) { + newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId()); + } // If config is set, table is not temporary and partition being inserted exists, capture // the list of files added. For not yet existing partitions (insert overwrite to new partition @@ -1873,7 +1912,7 @@ public class Hive { // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException // In that case, we want to retry with alterPartition. LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, ec); + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); } catch (Exception e) { try { final FileSystem newPathFileSystem = newPartPath.getFileSystem(this.getConf()); @@ -1892,7 +1931,7 @@ public class Hive { addWriteNotificationLog(tbl, partSpec, newFiles, writeId); } } else { - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, ec); + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); } perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION); @@ -1989,13 +2028,16 @@ public class Hive { } private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, - Partition newTPart, EnvironmentContext ec) throws MetaException, TException { + Partition newTPart, TableSnapshot tableSnapshot) throws MetaException, TException { + EnvironmentContext ec = new EnvironmentContext(); if (hasFollowingStatsTask) { ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } LOG.debug("Altering existing partition " + newTPart.getSpec()); - getSynchronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(), - newTPart.getTPartition(), ec); + getSynchronizedMSC().alter_partition( + tbl.getDbName(), tbl.getTableName(), newTPart.getTPartition(), new EnvironmentContext(), + tableSnapshot == null ? -1 : tableSnapshot.getTxnId(), + tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } /** @@ -2516,7 +2558,7 @@ private void constructOneLBLocationMap(FileStatus fSta, out.add(new Partition(tbl, outPart)); } getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), - partsToAlter, new EnvironmentContext()); + partsToAlter, new EnvironmentContext(), -1, null, -1); for ( org.apache.hadoop.hive.metastore.api.Partition outPart : getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){ @@ -5362,43 +5404,4 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } } - - private void setTableSnapshotForTransactionalTable(EnvironmentContext ec, HiveConf conf, - Table newTbl, boolean isStatsUpdater) throws LockException { - - org.apache.hadoop.hive.metastore.api.Table newTTbl = newTbl.getTTable(); - AcidUtils.TableSnapshot tableSnapshot = - AcidUtils.getTableSnapshot(conf, newTbl, isStatsUpdater); - if (tableSnapshot == null) return; - if (ec != null) { // Can be null for create table case; we don't need to verify txn stats. - ec.putToProperties(StatsSetupConst.TXN_ID, Long.toString(tableSnapshot.getTxnId())); - if (tableSnapshot.getValidWriteIdList() != null) { - ec.putToProperties(StatsSetupConst.VALID_WRITE_IDS, tableSnapshot.getValidWriteIdList()); - } else { - LOG.warn("Table snapshot has null write IDs for " + newTbl); - } - } - - if (isStatsUpdater) { - newTTbl.setWriteId(tableSnapshot.getWriteId()); - } - } - - private void setTableSnapshotForTransactionalPartition(EnvironmentContext ec, HiveConf conf, - Partition partition, boolean isStatsUpdater) throws LockException { - AcidUtils.TableSnapshot tableSnapshot = - AcidUtils.getTableSnapshot(conf, partition.getTable(), isStatsUpdater); - org.apache.hadoop.hive.metastore.api.Partition tpartition = partition.getTPartition(); - if (tableSnapshot == null) return; - ec.putToProperties(StatsSetupConst.TXN_ID, Long.toString(tableSnapshot.getTxnId())); - if (tableSnapshot.getValidWriteIdList() != null) { - ec.putToProperties(StatsSetupConst.VALID_WRITE_IDS, tableSnapshot.getValidWriteIdList()); - } else { - LOG.warn("Table snapshot has null write IDs for " + partition); - } - - if (isStatsUpdater) { - tpartition.setWriteId(tableSnapshot.getWriteId()); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index 5d382ae..f7c9009 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -158,6 +158,18 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I } @Override + public void truncateTable(String dbName, String tableName, + List<String> partNames, long txnId, String validWriteIds, long writeId) + throws TException { + org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName); + if (table != null) { + truncateTempTable(table); + return; + } + super.truncateTable(dbName, tableName, partNames, txnId, validWriteIds, writeId); + } + + @Override public org.apache.hadoop.hive.metastore.api.Table getTable(String dbname, String name) throws MetaException, TException, NoSuchObjectException { // First check temp tables @@ -348,6 +360,21 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I } @Override + public void alter_table(String catName, String dbName, String tbl_name, + org.apache.hadoop.hive.metastore.api.Table new_tbl, + EnvironmentContext envContext, long txnId, String validWriteIds) + throws InvalidOperationException, MetaException, TException { + org.apache.hadoop.hive.metastore.api.Table old_tbl = getTempTable(dbName, tbl_name); + if (old_tbl != null) { + //actually temp table does not support partitions, cascade is not applicable here + alterTempTable(dbName, tbl_name, old_tbl, new_tbl, null); + return; + } + super.alter_table(catName, dbName, tbl_name, new_tbl, envContext, txnId, + validWriteIds); + } + + @Override public void alter_table(String dbname, String tbl_name, org.apache.hadoop.hive.metastore.api.Table new_tbl) throws InvalidOperationException, MetaException, TException { http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index be43686..40039ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -2297,4 +2297,8 @@ public abstract class BaseSemanticAnalyzer { public DDLDescWithWriteId getAcidDdlDesc() { return null; } + + public WriteEntity getAcidAnalyzeTable() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index b6825ae..04c0808 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -661,6 +661,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { new ColumnStatsUpdateWork(partName, mapProp, tbl.getDbName(), tbl.getTableName(), colName, colType); ColumnStatsUpdateTask cStatsUpdateTask = (ColumnStatsUpdateTask) TaskFactory .get(columnStatsUpdateWork); + // TODO: doesn't look like this path is actually ever exercised. Maybe this needs to be removed. + addInputsOutputsAlterTable(tblName, partSpec, AlterTableTypes.UPDATESTATS); + if (AcidUtils.isTransactionalTable(tbl)) { + setAcidDdlDesc(columnStatsUpdateWork); + } rootTasks.add(cStatsUpdateTask); } @@ -1479,11 +1484,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec, null, table); - if(truncateTblDesc.mayNeedWriteId()) { - if(this.ddlDescWithWriteId != null) { - throw new IllegalStateException("ddlDescWithWriteId is already set: " + this.ddlDescWithWriteId); - } - this.ddlDescWithWriteId = truncateTblDesc; + if (truncateTblDesc.mayNeedWriteId()) { + setAcidDdlDesc(truncateTblDesc); } DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), truncateTblDesc); @@ -1757,22 +1759,41 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { alterTblDesc.setEnvironmentContext(environmentContext); alterTblDesc.setOldName(tableName); - boolean isPotentialMmSwitch = AcidUtils.isTablePropertyTransactional(mapProp) + + + boolean isToTxn = AcidUtils.isTablePropertyTransactional(mapProp) || mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); - addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isPotentialMmSwitch); + addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isToTxn); + // This special handling is because we cannot generate write ID for full ACID conversion, + // it will break the weird 10000001-write-ID logic that is currently in use. However, we do + // want to generate a write ID for prop changes for existing txn tables, or MM conversion. + boolean isAcidConversion = isToTxn && AcidUtils.isFullAcidTable(mapProp) + && !AcidUtils.isFullAcidTable(getTable(qualified, true)); DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), alterTblDesc); - if (isPotentialMmSwitch) { - if(this.ddlDescWithWriteId != null) { - throw new IllegalStateException("ddlDescWithWriteId is already set: " + this.ddlDescWithWriteId); - } - this.ddlDescWithWriteId = alterTblDesc; + if (isToTxn) { + alterTblDesc.setIsFullAcidConversion(isAcidConversion); + setAcidDdlDesc(alterTblDesc); ddlWork.setNeedLock(true); // Hmm... why don't many other operations here need locks? } + if (changeStatsSucceeded) { + Table table = getTable(qualified, true); + if (AcidUtils.isTransactionalTable(table)) { + alterTblDesc.setIsExplicitStatsUpdate(true); + setAcidDdlDesc(alterTblDesc); + } + } rootTasks.add(TaskFactory.get(ddlWork)); } + private void setAcidDdlDesc(DDLDescWithWriteId alterTblDesc) { + if(this.ddlDescWithWriteId != null) { + throw new IllegalStateException("ddlDescWithWriteId is already set: " + this.ddlDescWithWriteId); + } + this.ddlDescWithWriteId = alterTblDesc; + } + @Override public DDLDescWithWriteId getAcidDdlDesc() { return ddlDescWithWriteId; @@ -3148,6 +3169,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { String targetName = getDotName(target); AlterTableDesc alterTblDesc = new AlterTableDesc(sourceName, targetName, expectView, null); + Table table = getTable(sourceName, true); + if (AcidUtils.isTransactionalTable(table)) { + setAcidDdlDesc(alterTblDesc); + } addInputsOutputsAlterTable(sourceName, null, alterTblDesc); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc))); @@ -3270,6 +3295,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints); } addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc); + if (AcidUtils.isTransactionalTable(tab)) { + // Note: we might actually need it only when certain changes (e.g. name or type?) are made. + setAcidDdlDesc(alterTblDesc); + } + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc))); @@ -3292,7 +3322,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { partSpecs.add(oldPartSpec); partSpecs.add(newPartSpec); addTablePartsOutputs(tab, partSpecs, WriteEntity.WriteType.DDL_EXCLUSIVE); - RenamePartitionDesc renamePartitionDesc = new RenamePartitionDesc(tblName, oldPartSpec, newPartSpec, null); + RenamePartitionDesc renamePartitionDesc = new RenamePartitionDesc( + tblName, oldPartSpec, newPartSpec, null, tab); + if (AcidUtils.isTransactionalTable(tab)) { + setAcidDdlDesc(renamePartitionDesc); + } rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), renamePartitionDesc))); } @@ -3325,6 +3359,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { AlterTableDesc alterTblDesc = new AlterTableDesc(tblName, partSpec, newCols, alterType, isCascade); + Table table = getTable(tblName, true); + if (AcidUtils.isTransactionalTable(table)) { + setAcidDdlDesc(alterTblDesc); + } addInputsOutputsAlterTable(tblName, partSpec, alterTblDesc); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), @@ -3432,6 +3470,9 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { AlterTableAlterPartDesc alterTblAlterPartDesc = new AlterTableAlterPartDesc(getDotName(qualified), newCol); + if (AcidUtils.isTransactionalTable(tab)) { + setAcidDdlDesc(alterTblAlterPartDesc); + } rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblAlterPartDesc))); @@ -4336,7 +4377,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } } - private void analyzeAlterMaterializedViewRewrite(String mvName, ASTNode ast) throws SemanticException { + private void analyzeAlterMaterializedViewRewrite(String fqMvName, ASTNode ast) throws SemanticException { // Value for the flag boolean enableFlag; switch (ast.getChild(0).getType()) { @@ -4352,11 +4393,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { AlterMaterializedViewDesc alterMVDesc = new AlterMaterializedViewDesc(AlterMaterializedViewTypes.UPDATE_REWRITE_FLAG); - alterMVDesc.setMaterializedViewName(mvName); + alterMVDesc.setFqMaterializedViewName(fqMvName); alterMVDesc.setRewriteEnableFlag(enableFlag); // It can be fully qualified name or use default database - Table materializedViewTable = getTable(mvName, true); + Table materializedViewTable = getTable(fqMvName, true); // One last test: if we are enabling the rewrite, we need to check that query // only uses transactional (MM and ACID) tables @@ -4370,6 +4411,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } } + if (AcidUtils.isTransactionalTable(materializedViewTable)) { + setAcidDdlDesc(alterMVDesc); + } + inputs.add(new ReadEntity(materializedViewTable)); outputs.add(new WriteEntity(materializedViewTable, WriteEntity.WriteType.DDL_EXCLUSIVE)); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 0ca9b58..91d0834 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -56,6 +56,7 @@ import org.antlr.runtime.tree.TreeWizard.ContextVisitor; import org.apache.calcite.rel.RelNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.commons.lang.StringUtils; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -386,6 +387,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { protected AnalyzeRewriteContext analyzeRewrite; + private WriteEntity acidAnalyzeTable; + // A mapping from a tableName to a table object in metastore. Map<String, Table> tabNameToTabObject; @@ -11212,64 +11215,76 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // if it is not analyze command and not column stats, then do not gatherstats if (!qbp.isAnalyzeCommand() && qbp.getAnalyzeRewrite() == null) { tsDesc.setGatherStats(false); - } else { - if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { - String statsTmpLoc = ctx.getTempDirForInterimJobPath(tab.getPath()).toString(); - LOG.debug("Set stats collection dir : " + statsTmpLoc); - tsDesc.setTmpStatsDir(statsTmpLoc); - } - tsDesc.setGatherStats(true); - tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); + return; + } - // append additional virtual columns for storing statistics - Iterator<VirtualColumn> vcs = VirtualColumn.getStatsRegistry(conf).iterator(); - List<VirtualColumn> vcList = new ArrayList<VirtualColumn>(); - while (vcs.hasNext()) { - VirtualColumn vc = vcs.next(); - rwsch.put(alias, vc.getName(), new ColumnInfo(vc.getName(), - vc.getTypeInfo(), alias, true, vc.getIsHidden())); - vcList.add(vc); + if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { + String statsTmpLoc = ctx.getTempDirForInterimJobPath(tab.getPath()).toString(); + LOG.debug("Set stats collection dir : " + statsTmpLoc); + tsDesc.setTmpStatsDir(statsTmpLoc); + } + tsDesc.setGatherStats(true); + tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE)); + + // append additional virtual columns for storing statistics + Iterator<VirtualColumn> vcs = VirtualColumn.getStatsRegistry(conf).iterator(); + List<VirtualColumn> vcList = new ArrayList<VirtualColumn>(); + while (vcs.hasNext()) { + VirtualColumn vc = vcs.next(); + rwsch.put(alias, vc.getName(), new ColumnInfo(vc.getName(), + vc.getTypeInfo(), alias, true, vc.getIsHidden())); + vcList.add(vc); + } + tsDesc.addVirtualCols(vcList); + + String tblName = tab.getTableName(); + // Theoretically the key prefix could be any unique string shared + // between TableScanOperator (when publishing) and StatsTask (when aggregating). + // Here we use + // db_name.table_name + partitionSec + // as the prefix for easy of read during explain and debugging. + // Currently, partition spec can only be static partition. + String k = org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.encodeTableName(tblName) + Path.SEPARATOR; + tsDesc.setStatsAggPrefix(tab.getDbName()+"."+k); + + // set up WriteEntity for replication and txn stats + WriteEntity we = new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED); + we.setTxnAnalyze(true); + outputs.add(we); + if (AcidUtils.isTransactionalTable(tab)) { + if (acidAnalyzeTable != null) { + throw new IllegalStateException("Multiple ACID tables in analyze: " + + we + ", " + acidAnalyzeTable); + } + acidAnalyzeTable = we; + } + + // add WriteEntity for each matching partition + if (tab.isPartitioned()) { + List<String> cols = new ArrayList<String>(); + if (qbp.getAnalyzeRewrite() != null) { + List<FieldSchema> partitionCols = tab.getPartCols(); + for (FieldSchema fs : partitionCols) { + cols.add(fs.getName()); + } + tsDesc.setPartColumns(cols); + return; } - tsDesc.addVirtualCols(vcList); - - String tblName = tab.getTableName(); - // Theoretically the key prefix could be any unique string shared - // between TableScanOperator (when publishing) and StatsTask (when aggregating). - // Here we use - // db_name.table_name + partitionSec - // as the prefix for easy of read during explain and debugging. - // Currently, partition spec can only be static partition. - String k = org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.encodeTableName(tblName) + Path.SEPARATOR; - tsDesc.setStatsAggPrefix(tab.getDbName()+"."+k); - - // set up WriteEntity for replication - outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED)); - - // add WriteEntity for each matching partition - if (tab.isPartitioned()) { - List<String> cols = new ArrayList<String>(); - if (qbp.getAnalyzeRewrite() != null) { - List<FieldSchema> partitionCols = tab.getPartCols(); - for (FieldSchema fs : partitionCols) { - cols.add(fs.getName()); - } - tsDesc.setPartColumns(cols); - return; - } - TableSpec tblSpec = qbp.getTableSpec(alias); - Map<String, String> partSpec = tblSpec.getPartSpec(); - if (partSpec != null) { - cols.addAll(partSpec.keySet()); - tsDesc.setPartColumns(cols); - } else { - throw new SemanticException(ErrorMsg.NEED_PARTITION_SPECIFICATION.getMsg()); - } - List<Partition> partitions = qbp.getTableSpec().partitions; - if (partitions != null) { - for (Partition partn : partitions) { - // inputs.add(new ReadEntity(partn)); // is this needed at all? - outputs.add(new WriteEntity(partn, WriteEntity.WriteType.DDL_NO_LOCK)); - } + TableSpec tblSpec = qbp.getTableSpec(alias); + Map<String, String> partSpec = tblSpec.getPartSpec(); + if (partSpec != null) { + cols.addAll(partSpec.keySet()); + tsDesc.setPartColumns(cols); + } else { + throw new SemanticException(ErrorMsg.NEED_PARTITION_SPECIFICATION.getMsg()); + } + List<Partition> partitions = qbp.getTableSpec().partitions; + if (partitions != null) { + for (Partition partn : partitions) { + // inputs.add(new ReadEntity(partn)); // is this needed at all? + WriteEntity pwe = new WriteEntity(partn, WriteEntity.WriteType.DDL_NO_LOCK); + pwe.setTxnAnalyze(true); + outputs.add(pwe); } } } @@ -12745,7 +12760,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { @Override public void validate() throws SemanticException { - LOG.debug("validation start"); boolean wasAcidChecked = false; // Validate inputs and outputs have right protectmode to execute the query for (ReadEntity readEntity : getInputs()) { @@ -14954,4 +14968,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { rewrittenQueryStr.append(")"); } } + + @Override + public WriteEntity getAcidAnalyzeTable() { + return acidAnalyzeTable; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 43f2cbc..0035026 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -60,7 +60,7 @@ public class RenamePartitionHandler extends AbstractMessageHandler { } RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc( - tableName, oldPartSpec, newPartSpec, context.eventOnlyReplicationSpec()); + tableName, oldPartSpec, newPartSpec, context.eventOnlyReplicationSpec(), null); Task<DDLWork> renamePtnTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf); context.log.debug("Added rename ptn task : {}:{}->{}", http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterMaterializedViewDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterMaterializedViewDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterMaterializedViewDesc.java index 8493368..865d143 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterMaterializedViewDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterMaterializedViewDesc.java @@ -20,15 +20,16 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.plan.Explain.Level; /** * AlterMaterializedViewDesc. */ @Explain(displayName = "Alter Materialized View", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -public class AlterMaterializedViewDesc extends DDLDesc implements Serializable { +public class AlterMaterializedViewDesc extends DDLDesc implements Serializable, DDLDescWithWriteId { private static final long serialVersionUID = 1L; - private String materializedViewName; + private String fqMaterializedViewName; private boolean rewriteEnable; /** @@ -40,6 +41,7 @@ public class AlterMaterializedViewDesc extends DDLDesc implements Serializable { }; AlterMaterializedViewTypes op; + private long writeId; public AlterMaterializedViewDesc() { } @@ -53,15 +55,15 @@ public class AlterMaterializedViewDesc extends DDLDesc implements Serializable { */ @Explain(displayName = "name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getMaterializedViewName() { - return materializedViewName; + return fqMaterializedViewName; } /** * @param materializedViewName * the materializedViewName to set */ - public void setMaterializedViewName(String materializedViewName) { - this.materializedViewName = materializedViewName; + public void setFqMaterializedViewName(String materializedViewName) { + this.fqMaterializedViewName = materializedViewName; } /** @@ -102,4 +104,19 @@ public class AlterMaterializedViewDesc extends DDLDesc implements Serializable { this.op = op; } + @Override + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + @Override + public String getFullTableName() { + return fqMaterializedViewName; + } + + @Override + public boolean mayNeedWriteId() { + return true; // Verified when this is set as DDL Desc for ACID. + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableAlterPartDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableAlterPartDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableAlterPartDesc.java index 54687e0..652c007 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableAlterPartDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableAlterPartDesc.java @@ -20,30 +20,31 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.hive.metastore.api.FieldSchema; -public class AlterTableAlterPartDesc extends DDLDesc { - private String tableName; +public class AlterTableAlterPartDesc extends DDLDesc implements DDLDesc.DDLDescWithWriteId { + private String fqTableName; private FieldSchema partKeySpec; + private long writeId; public AlterTableAlterPartDesc() { } /** - * @param tableName + * @param fqTableName * table containing the partition * @param partKeySpec */ - public AlterTableAlterPartDesc(String tableName, FieldSchema partKeySpec) { + public AlterTableAlterPartDesc(String fqTableName, FieldSchema partKeySpec) { super(); - this.tableName = tableName; + this.fqTableName = fqTableName; this.partKeySpec = partKeySpec; } public String getTableName() { - return tableName; + return fqTableName; } public void setTableName(String tableName) { - this.tableName = tableName; + this.fqTableName = tableName; } public FieldSchema getPartKeySpec() { @@ -53,4 +54,19 @@ public class AlterTableAlterPartDesc extends DDLDesc { public void setPartKeySpec(FieldSchema partKeySpec) { this.partKeySpec = partKeySpec; } + + @Override + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + @Override + public String getFullTableName() { + return fqTableName; + } + + @Override + public boolean mayNeedWriteId() { + return true; // Checked before setting as the acid desc. + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java index ec04a01..680e029 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java @@ -66,14 +66,14 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL ALTERSKEWEDLOCATION("alter skew location"), ALTERBUCKETNUM("alter bucket number"), ALTERPARTITION("alter partition"), COMPACT("compact"), TRUNCATE("truncate"), MERGEFILES("merge files"), DROPCONSTRAINT("drop constraint"), ADDCONSTRAINT("add constraint"), - UPDATECOLUMNS("update columns"), OWNER("set owner"); + UPDATECOLUMNS("update columns"), OWNER("set owner"), UPDATESTATS("update stats"); ; private final String name; private AlterTableTypes(String name) { this.name = name; } public String getName() { return name; } - public static final List<AlterTableTypes> nonNativeTableAllowedTypes = + public static final List<AlterTableTypes> nonNativeTableAllowedTypes = ImmutableList.of(ADDPROPS, DROPPROPS, ADDCOLS); } @@ -139,6 +139,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL ReplicationSpec replicationSpec; private Long writeId = null; PrincipalDesc ownerPrincipal; + private boolean isExplicitStatsUpdate, isFullAcidConversion; public AlterTableDesc() { } @@ -960,8 +961,21 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL @Override public boolean mayNeedWriteId() { - return getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS - && AcidUtils.isToInsertOnlyTable(null, getProps()); + switch (getOp()) { + case ADDPROPS: { + return isExplicitStatsUpdate || AcidUtils.isToInsertOnlyTable(null, getProps()) + || (AcidUtils.isTransactionalTable(getProps()) && !isFullAcidConversion); + } + case DROPPROPS: return isExplicitStatsUpdate; + // The check for the following ones is performed before setting AlterTableDesc into the acid field. + // These need write ID and stuff because they invalidate column stats. + case RENAMECOLUMN: return true; + case RENAME: return true; + case REPLACECOLS: return true; + case ADDCOLS: return true; + // RENAMEPARTITION is handled in RenamePartitionDesc + default: return false; + } } public Long getWriteId() { @@ -972,4 +986,12 @@ public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDL public String toString() { return this.getClass().getSimpleName() + " for " + getFullTableName(); } + + public void setIsExplicitStatsUpdate(boolean b) { + this.isExplicitStatsUpdate = b; + } + + public void setIsFullAcidConversion(boolean b) { + this.isFullAcidConversion = b; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java index cbccd87..6de1a37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import java.util.Map; + +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -32,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; * ('maxColLen'='4444','avgColLen'='44.4'); */ @Explain(displayName = "Column Stats Update Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -public class ColumnStatsUpdateWork implements Serializable { +public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId { private static final long serialVersionUID = 1L; private final String partName; private final Map<String, String> mapProp; @@ -40,12 +42,13 @@ public class ColumnStatsUpdateWork implements Serializable { private final String tableName; private final String colName; private final String colType; + private long writeId; public ColumnStatsUpdateWork(String partName, Map<String, String> mapProp, String dbName, String tableName, - String colName, + String colName, String colType) { this.partName = partName; this.mapProp = mapProp; @@ -83,4 +86,19 @@ public class ColumnStatsUpdateWork implements Serializable { public String getColType() { return colType; } + + @Override + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + @Override + public String getFullTableName() { + return dbName + "." + tableName; + } + + @Override + public boolean mayNeedWriteId() { + return true; // Checked at setup time; if this is called, the table is transactional. + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java index a13ac13..a4a31a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; import java.io.Serializable; import java.util.LinkedHashMap; @@ -26,7 +28,7 @@ import java.util.Map; /** * Contains the information needed to rename a partition. */ -public class RenamePartitionDesc extends DDLDesc implements Serializable { +public class RenamePartitionDesc extends DDLDesc implements Serializable, DDLDescWithWriteId { private static final long serialVersionUID = 1L; @@ -35,6 +37,8 @@ public class RenamePartitionDesc extends DDLDesc implements Serializable { private LinkedHashMap<String, String> oldPartSpec; private LinkedHashMap<String, String> newPartSpec; private ReplicationSpec replicationSpec; + private String fqTableName; + private long writeId; /** * For serialization only. @@ -49,13 +53,15 @@ public class RenamePartitionDesc extends DDLDesc implements Serializable { * old partition specification. * @param newPartSpec * new partition specification. + * @param table */ - public RenamePartitionDesc(String tableName, - Map<String, String> oldPartSpec, Map<String, String> newPartSpec, ReplicationSpec replicationSpec) { + public RenamePartitionDesc(String tableName, Map<String, String> oldPartSpec, + Map<String, String> newPartSpec, ReplicationSpec replicationSpec, Table table) { this.tableName = tableName; this.oldPartSpec = new LinkedHashMap<String,String>(oldPartSpec); this.newPartSpec = new LinkedHashMap<String,String>(newPartSpec); this.replicationSpec = replicationSpec; + this.fqTableName = table != null ? (table.getDbName() + "." + table.getTableName()) : tableName; } /** @@ -66,14 +72,6 @@ public class RenamePartitionDesc extends DDLDesc implements Serializable { } /** - * @param tableName - * the table we're going to add the partitions to. - */ - public void setTableName(String tableName) { - this.tableName = tableName; - } - - /** * @return location of partition in relation to table */ public String getLocation() { @@ -123,4 +121,19 @@ public class RenamePartitionDesc extends DDLDesc implements Serializable { * This can result in a "RENAME IF NEWER THAN" kind of semantic */ public ReplicationSpec getReplicationSpec() { return this.replicationSpec; } + + @Override + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + @Override + public String getFullTableName() { + return fqTableName; + } + + @Override + public boolean mayNeedWriteId() { + return true; // The check is done when setting this as the ACID DDLDesc. + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c9947f3/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java index 8c3d852..9e83576 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -34,6 +36,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; */ @Explain(displayName = "Truncate Table or Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class TruncateTableDesc extends DDLDesc implements DDLDesc.DDLDescWithWriteId { + private final static Logger LOG = LoggerFactory.getLogger(TruncateTableDesc.class); private static final long serialVersionUID = 1L; @@ -51,9 +54,11 @@ public class TruncateTableDesc extends DDLDesc implements DDLDesc.DDLDescWithWri public TruncateTableDesc() { } + public TruncateTableDesc(String tableName, Map<String, String> partSpec, ReplicationSpec replicationSpec) { this(tableName, partSpec, replicationSpec, null); } + public TruncateTableDesc(String tableName, Map<String, String> partSpec, ReplicationSpec replicationSpec, Table table) { this.tableName = tableName; @@ -124,10 +129,13 @@ public class TruncateTableDesc extends DDLDesc implements DDLDesc.DDLDescWithWri public void setWriteId(long writeId) { this.writeId = writeId; } + @Override public String getFullTableName() { return fullTableName; } + + @Override public boolean mayNeedWriteId() { return isTransactional; @@ -137,5 +145,4 @@ public class TruncateTableDesc extends DDLDesc implements DDLDesc.DDLDescWithWri public String toString() { return this.getClass().getSimpleName() + " for " + getFullTableName(); } - }