[01/24] hive git commit: HIVE-18839: Implement incremental rebuild for materialized views (only insert operations in source tables) (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Repository: hive Updated Branches: refs/heads/branch-3 d1a935816 -> c695c70b1 http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java -- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a79242b..9256b7a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -63,9 +63,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache; +import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; @@ -854,9 +856,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.Idempotent("No-op if already committed") public void commitTxn(CommitTxnRequest rqst) -throws NoSuchTxnException, TxnAbortedException, MetaException { +throws NoSuchTxnException, TxnAbortedException, MetaException { +MaterializationsRebuildLockHandler materializationsRebuildLockHandler = +MaterializationsRebuildLockHandler.get(); +String fullyQualifiedName = null; +String dbName = null; +String tblName = null; +long writeId = 0L; +long timestamp = 0L; +boolean isUpdateDelete = false; long txnid = rqst.getTxnid(); long sourceTxnId = -1; + try { Connection dbConn = null; Statement stmt = null; @@ -905,6 +916,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); if (rs.next()) { + isUpdateDelete = true; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict /** @@ -1007,6 +1019,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } +// Obtain information that we need to update registry +s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; +LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); +rs = stmt.executeQuery(s); +if (rs.next()) { + dbName = rs.getString(1); + tblName = rs.getString(2); + fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName); + writeId = rs.getLong(3); + timestamp = rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime(); +} s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); modCount = stmt.executeUpdate(s); @@ -1021,16 +1044,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { modCount = stmt.executeUpdate(s); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); -// Update registry with modifications -s = "select ctc_database, ctc_table, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; -rs = stmt.executeQuery(s); -if (rs.next()) { - LOG.debug("Going to register table modification in invalidation cache <" + s + ">"); - MaterializationsInvalidationCache.get().notifyTableModification( - rs.getString(1), rs.getString(2), txnid, - rs.getTimestamp(3, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); -} - if (rqst.isSetReplPolicy()) { s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); @@ -1043,10 +1056,20 @@ abstract class TxnHandler implements TxnStore, TxnStor
[01/24] hive git commit: HIVE-18839: Implement incremental rebuild for materialized views (only insert operations in source tables) (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Repository: hive Updated Branches: refs/heads/master dcd9b5941 -> be420098f http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java -- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a79242b..9256b7a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -63,9 +63,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache; +import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; @@ -854,9 +856,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.Idempotent("No-op if already committed") public void commitTxn(CommitTxnRequest rqst) -throws NoSuchTxnException, TxnAbortedException, MetaException { +throws NoSuchTxnException, TxnAbortedException, MetaException { +MaterializationsRebuildLockHandler materializationsRebuildLockHandler = +MaterializationsRebuildLockHandler.get(); +String fullyQualifiedName = null; +String dbName = null; +String tblName = null; +long writeId = 0L; +long timestamp = 0L; +boolean isUpdateDelete = false; long txnid = rqst.getTxnid(); long sourceTxnId = -1; + try { Connection dbConn = null; Statement stmt = null; @@ -905,6 +916,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); if (rs.next()) { + isUpdateDelete = true; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict /** @@ -1007,6 +1019,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } +// Obtain information that we need to update registry +s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; +LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); +rs = stmt.executeQuery(s); +if (rs.next()) { + dbName = rs.getString(1); + tblName = rs.getString(2); + fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName); + writeId = rs.getLong(3); + timestamp = rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime(); +} s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); modCount = stmt.executeUpdate(s); @@ -1021,16 +1044,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { modCount = stmt.executeUpdate(s); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); -// Update registry with modifications -s = "select ctc_database, ctc_table, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; -rs = stmt.executeQuery(s); -if (rs.next()) { - LOG.debug("Going to register table modification in invalidation cache <" + s + ">"); - MaterializationsInvalidationCache.get().notifyTableModification( - rs.getString(1), rs.getString(2), txnid, - rs.getTimestamp(3, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); -} - if (rqst.isSetReplPolicy()) { s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); @@ -1043,10 +1056,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.