http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --cc 
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0000000,3785f89..4ef6786
mode 000000,100644..100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@@ -1,0 -1,4906 +1,4949 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.txn;
+ 
+ import java.io.PrintWriter;
+ import java.nio.ByteBuffer;
+ import java.sql.Connection;
+ import java.sql.Driver;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.SQLFeatureNotSupportedException;
+ import java.sql.Savepoint;
+ import java.sql.Statement;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.BitSet;
+ import java.util.Calendar;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TimeZone;
+ import java.util.TreeSet;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
+ import javax.sql.DataSource;
+ 
+ import org.apache.commons.lang.ArrayUtils;
+ import org.apache.commons.lang.NotImplementedException;
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.classification.InterfaceStability;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.ValidReadTxnList;
+ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+ import org.apache.hadoop.hive.common.ValidTxnList;
+ import org.apache.hadoop.hive.common.ValidWriteIdList;
+ import org.apache.hadoop.hive.common.classification.RetrySemantics;
+ import org.apache.hadoop.hive.metastore.DatabaseProduct;
+ import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache;
+ import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler;
+ import org.apache.hadoop.hive.metastore.Warehouse;
+ import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+ import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+ import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
+ import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
+ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
+ import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+ import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+ import org.apache.hadoop.hive.metastore.metrics.Metrics;
+ import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+ import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.utils.StringableMap;
+ import org.apache.hadoop.util.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
++
+ import com.google.common.annotations.VisibleForTesting;
+ 
+ /**
+  * A handler to answer transaction related calls that come into the metastore
+  * server.
+  *
+  * Note on log messages:  Please include txnid:X and lockid info using
+  * {@link JavaUtils#txnIdToString(long)}
+  * and {@link JavaUtils#lockIdToString(long)} in all messages.
+  * The txnid:X and lockid:Y matches how Thrift object toString() methods are 
generated,
+  * so keeping the format consistent makes grep'ing the logs much easier.
+  *
+  * Note on HIVE_LOCKS.hl_last_heartbeat.
+  * For locks that are part of transaction, we set this 0 (would rather set it 
to NULL but
+  * Currently the DB schema has this NOT NULL) and only update/read heartbeat 
from corresponding
+  * transaction in TXNS.
+  *
+  * In general there can be multiple metastores where this logic can execute, 
thus the DB is
+  * used to ensure proper mutexing of operations.
+  * Select ... For Update (or equivalent: either MsSql with(updlock) or actual 
Update stmt) is
+  * used to properly sequence operations.  Most notably:
+  * 1. various sequence IDs are generated with aid of this mutex
+  * 2. ensuring that each (Hive) Transaction state is transitioned atomically. 
 Transaction state
+  *  includes its actual state (Open, Aborted) as well as it's lock 
list/component list.  Thus all
+  *  per transaction ops, either start by update/delete of the relevant TXNS 
row or do S4U on that row.
+  *  This allows almost all operations to run at READ_COMMITTED and minimizes 
DB deadlocks.
+  * 3. checkLock() - this is mutexted entirely since we must ensure that while 
we check if some lock
+  *  can be granted, no other (strictly speaking "earlier") lock can change 
state.
+  *
+  * The exception to his is Derby which doesn't support proper S4U.  Derby is 
always running embedded
+  * (this is the only supported configuration for Derby)
+  * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to 
properly sequnce the operations.
+  *
+  * {@link #derbyLock}
+ 
+  * If we ever decide to run remote Derby server, according to
+  * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all 
transactions will be
+  * seriazlied, so that would also work though has not been tested.
+  *
+  * General design note:
+  * It's imperative that any operation on a txn (e.g. commit), ensure 
(atomically) that this txn is
+  * still valid and active.  In the code this is usually achieved at the same 
time the txn record
+  * is locked for some operation.
+  * 
+  * Note on retry logic:
+  * Metastore has retry logic in both {@link 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
+  * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}.  The 
retry logic there is very
+  * generic and is not aware whether the operations are idempotent or not.  
(This is separate from
+  * retry logic here in TxnHander which can/does retry DB errors 
intelligently).  The worst case is
+  * when an op here issues a successful commit against the RDBMS but the 
calling stack doesn't
+  * receive the ack and retries.  (If an op fails before commit, it's 
trivially idempotent)
+  * Thus the ops here need to be made idempotent as much as possible or
+  * the metstore call stack should have logic not to retry.  There are {@link 
RetrySemantics}
+  * annotations to document the behavior.
+  */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
+ 
+   static final protected char INITIATED_STATE = 'i';
+   static final protected char WORKING_STATE = 'w';
+   static final protected char READY_FOR_CLEANING = 'r';
+   static final char FAILED_STATE = 'f';
+   static final char SUCCEEDED_STATE = 's';
+   static final char ATTEMPTED_STATE = 'a';
+ 
+   // Compactor types
+   static final protected char MAJOR_TYPE = 'a';
+   static final protected char MINOR_TYPE = 'i';
+ 
+   // Transaction states
+   static final protected char TXN_ABORTED = 'a';
+   static final protected char TXN_OPEN = 'o';
+   //todo: make these like OperationType and remove above char constatns
+   enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
+ 
+   public enum TxnType {
+     DEFAULT(0), REPL_CREATED(1), READ_ONLY(2);
+ 
+     private final int value;
+     TxnType(int value) {
+       this.value = value;
+     }
+ 
+     public int getValue() {
+       return value;
+     }
+   }
+ 
+   // Lock states
+   static final protected char LOCK_ACQUIRED = 'a';
+   static final protected char LOCK_WAITING = 'w';
+ 
+   // Lock types
+   static final protected char LOCK_EXCLUSIVE = 'e';
+   static final protected char LOCK_SHARED = 'r';
+   static final protected char LOCK_SEMI_SHARED = 'w';
+ 
+   static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
+   static final private Logger LOG = 
LoggerFactory.getLogger(TxnHandler.class.getName());
+ 
+   static private DataSource connPool;
+   private static DataSource connPoolMutex;
+   static private boolean doRetryOnConnPool = false;
+ 
+   private List<TransactionalMetaStoreEventListener> transactionalListeners;
+   
+   private enum OpertaionType {
+     SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d');
+     private final char sqlConst;
+     OpertaionType(char sqlConst) {
+       this.sqlConst = sqlConst;
+     }
+     public String toString() {
+       return Character.toString(sqlConst);
+     }
+     public static OpertaionType fromString(char sqlConst) {
+       switch (sqlConst) {
+         case 's':
+           return SELECT;
+         case 'i':
+           return INSERT;
+         case 'u':
+           return UPDATE;
+         case 'd':
+           return DELETE;
+         default:
+           throw new IllegalArgumentException(quoteChar(sqlConst));
+       }
+     }
+     public static OpertaionType fromDataOperationType(DataOperationType dop) {
+       switch (dop) {
+         case SELECT:
+           return OpertaionType.SELECT;
+         case INSERT:
+           return OpertaionType.INSERT;
+         case UPDATE:
+           return OpertaionType.UPDATE;
+         case DELETE:
+           return OpertaionType.DELETE;
+         default:
+           throw new IllegalArgumentException("Unexpected value: " + dop);
+       }
+     }
+   }
+ 
+   // Maximum number of open transactions that's allowed
+   private static volatile int maxOpenTxns = 0;
+   // Whether number of open transactions reaches the threshold
+   private static volatile boolean tooManyOpenTxns = false;
+ 
+   /**
+    * Number of consecutive deadlocks we have seen
+    */
+   private int deadlockCnt;
+   private long deadlockRetryInterval;
+   protected Configuration conf;
+   private static DatabaseProduct dbProduct;
+   private static SQLGenerator sqlGenerator;
+ 
+   // (End user) Transaction timeout, in milliseconds.
+   private long timeout;
+ 
+   private String identifierQuoteString; // quotes to use for quoting tables, 
where necessary
+   private long retryInterval;
+   private int retryLimit;
+   private int retryNum;
+   // Current number of open txns
+   private AtomicInteger numOpenTxns;
+ 
+   /**
+    * Derby specific concurrency control
+    */
+   private static final ReentrantLock derbyLock = new ReentrantLock(true);
+   /**
+    * must be static since even in UT there may be > 1 instance of TxnHandler
+    * (e.g. via Compactor services)
+    */
+   private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = 
new ConcurrentHashMap<>();
+   private static final String hostname = JavaUtils.hostname();
+ 
+   // Private methods should never catch SQLException and then throw 
MetaException.  The public
+   // methods depend on SQLException coming back so they can detect and handle 
deadlocks.  Private
+   // methods should only throw MetaException when they explicitly know 
there's a logic error and
+   // they want to throw past the public methods.
+   //
+   // All public methods that write to the database have to check for 
deadlocks when a SQLException
+   // comes back and handle it if they see one.  This has to be done with the 
connection pooling
+   // in mind.  To do this they should call checkRetryable() AFTER rolling 
back the db transaction,
+   // and then they should catch RetryException and call themselves 
recursively. See commitTxn for an example.
+ 
+   public TxnHandler() {
+   }
+ 
+   /**
+    * This is logically part of c'tor and must be called prior to any other 
method.
+    * Not physically part of c'tor due to use of reflection
+    */
+   public void setConf(Configuration conf) {
+     this.conf = conf;
+ 
+     checkQFileTestHack();
+ 
+     synchronized (TxnHandler.class) {
+       if (connPool == null) {
+         Connection dbConn = null;
+         // Set up the JDBC connection pool
+         try {
+           int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
+           long getConnectionTimeoutMs = 30000;
+           connPool = setupJdbcConnectionPool(conf, maxPoolSize, 
getConnectionTimeoutMs);
+           /*the mutex pools should ideally be somewhat larger since some 
operations require 1
+            connection from each pool and we want to avoid taking a connection 
from primary pool
+            and then blocking because mutex pool is empty.  There is only 1 
thread in any HMS trying
+            to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock.  The 
CheckLock operation gets a
+            connection from connPool first, then connPoolMutex.  All others, 
go in the opposite
+            order (not very elegant...).  So number of connection requests for 
connPoolMutex cannot
+            exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
+           connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + 
MUTEX_KEY.values().length, getConnectionTimeoutMs);
+           dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+           determineDatabaseProduct(dbConn);
+           sqlGenerator = new SQLGenerator(dbProduct, conf);
+         } catch (SQLException e) {
+           String msg = "Unable to instantiate JDBC connection pooling, " + 
e.getMessage();
+           LOG.error(msg);
+           throw new RuntimeException(e);
+         } finally {
+           closeDbConn(dbConn);
+         }
+       }
+     }
+ 
+     numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS);
+ 
+     timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+     buildJumpTable();
+     retryInterval = MetastoreConf.getTimeVar(conf, 
ConfVars.HMS_HANDLER_INTERVAL,
+         TimeUnit.MILLISECONDS);
+     retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
+     deadlockRetryInterval = retryInterval / 10;
+     maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
+ 
+     try {
+       transactionalListeners = MetaStoreUtils.getMetaStoreListeners(
+               TransactionalMetaStoreEventListener.class,
+                       conf, MetastoreConf.getVar(conf, 
ConfVars.TRANSACTIONAL_EVENT_LISTENERS));
+     } catch(MetaException e) {
+       String msg = "Unable to get transaction listeners, " + e.getMessage();
+       LOG.error(msg);
+       throw new RuntimeException(e);
+     }
+   }
+ 
+   @Override
+   public Configuration getConf() {
+     return conf;
+   }
+ 
+   @Override
+   @RetrySemantics.ReadOnly
+   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
+     try {
+       // We need to figure out the current transaction number and the list of
+       // open transactions.  To avoid needing a transaction on the underlying
+       // database we'll look at the current transaction number first.  If it
+       // subsequently shows up in the open list that's ok.
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+       try {
+         /**
+          * This method can run at READ_COMMITTED as long as long as
+          * {@link 
#openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
+          * More specifically, as long as advancing TransactionID in 
NEXT_TXN_ID is atomic with
+          * adding corresponding entries into TXNS.  The reason is that any 
txnid below HWM
+          * is either in TXNS and thus considered open (Open/Aborted) or it's 
considered Committed.
+          */
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           throw new MetaException("Transaction tables not properly " +
+             "initialized, no record found in next_txn_id");
+         }
+         long hwm = rs.getLong(1);
+         if (rs.wasNull()) {
+           throw new MetaException("Transaction tables not properly " +
+             "initialized, null record found in next_txn_id");
+         }
+         close(rs);
+         List<TxnInfo> txnInfos = new ArrayList<>();
+         //need the WHERE clause below to ensure consistent results with 
READ_COMMITTED
+         s = "select txn_id, txn_state, txn_user, txn_host, txn_started, 
txn_last_heartbeat from " +
+             "TXNS where txn_id <= " + hwm;
+         LOG.debug("Going to execute query<" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           char c = rs.getString(2).charAt(0);
+           TxnState state;
+           switch (c) {
+             case TXN_ABORTED:
+               state = TxnState.ABORTED;
+               break;
+ 
+             case TXN_OPEN:
+               state = TxnState.OPEN;
+               break;
+ 
+             default:
+               throw new MetaException("Unexpected transaction state " + c +
+                 " found in txns table");
+           }
+           TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, 
rs.getString(3), rs.getString(4));
+           txnInfo.setStartedTime(rs.getLong(5));
+           txnInfo.setLastHeartbeatTime(rs.getLong(6));
+           txnInfos.add(txnInfo);
+         }
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+         return new GetOpenTxnsInfoResponse(hwm, txnInfos);
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "getOpenTxnsInfo");
+         throw new MetaException("Unable to select from transaction database: 
" + getMessage(e)
+           + StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return getOpenTxnsInfo();
+     }
+   }
++
+   @Override
+   @RetrySemantics.ReadOnly
+   public GetOpenTxnsResponse getOpenTxns() throws MetaException {
+     try {
+       // We need to figure out the current transaction number and the list of
+       // open transactions.  To avoid needing a transaction on the underlying
+       // database we'll look at the current transaction number first.  If it
+       // subsequently shows up in the open list that's ok.
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+       try {
+         /**
+          * This runs at READ_COMMITTED for exactly the same reason as {@link 
#getOpenTxnsInfo()}
+          */
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           throw new MetaException("Transaction tables not properly " +
+             "initialized, no record found in next_txn_id");
+         }
+         long hwm = rs.getLong(1);
+         if (rs.wasNull()) {
+           throw new MetaException("Transaction tables not properly " +
+             "initialized, null record found in next_txn_id");
+         }
+         close(rs);
+         List<Long> openList = new ArrayList<>();
+         //need the WHERE clause below to ensure consistent results with 
READ_COMMITTED
+         s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " 
order by txn_id";
+         LOG.debug("Going to execute query<" + s + ">");
+         rs = stmt.executeQuery(s);
+         long minOpenTxn = Long.MAX_VALUE;
+         BitSet abortedBits = new BitSet();
+         while (rs.next()) {
+           long txnId = rs.getLong(1);
+           openList.add(txnId);
+           char c = rs.getString(2).charAt(0);
+           if(c == TXN_OPEN) {
+             minOpenTxn = Math.min(minOpenTxn, txnId);
+           } else if (c == TXN_ABORTED) {
+             abortedBits.set(openList.size() - 1);
+           }
+         }
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+         ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+         GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, 
byteBuffer);
+         if(minOpenTxn < Long.MAX_VALUE) {
+           otr.setMin_open_txn(minOpenTxn);
+         }
+         return otr;
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "getOpenTxns");
+         throw new MetaException("Unable to select from transaction database, "
+           + StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return getOpenTxns();
+     }
+   }
+ 
+   /**
+    * Retry-by-caller note:
+    * Worst case, it will leave an open txn which will timeout.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
+     if (!tooManyOpenTxns && numOpenTxns.get() >= maxOpenTxns) {
+       tooManyOpenTxns = true;
+     }
+     if (tooManyOpenTxns) {
+       if (numOpenTxns.get() < maxOpenTxns * 0.9) {
+         tooManyOpenTxns = false;
+       } else {
+         LOG.warn("Maximum allowed number of open transactions (" + 
maxOpenTxns + ") has been " +
+             "reached. Current number of open transactions: " + numOpenTxns);
+         throw new MetaException("Maximum allowed number of open transactions 
has been reached. " +
+             "See hive.max.open.txns.");
+       }
+     }
+ 
+     int numTxns = rqst.getNum_txns();
+     if (numTxns <= 0) {
+       throw new MetaException("Invalid input for number of txns: " + numTxns);
+     }
+ 
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         lockInternal();
+         /**
+          * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
+          * that advancing the counter in NEXT_TXN_ID and adding appropriate 
entries to TXNS is atomic.
+          * Also, advancing the counter must work when multiple metastores are 
running.
+          * SELECT ... FOR UPDATE is used to prevent
+          * concurrent DB transactions being rolled back due to Write-Write 
conflict on NEXT_TXN_ID.
+          *
+          * In the current design, there can be several metastore instances 
running in a given Warehouse.
+          * This makes ideas like reserving a range of IDs to save trips to DB 
impossible.  For example,
+          * a client may go to MS1 and start a transaction with ID 500 to 
update a particular row.
+          * Now the same client will start another transaction, except it ends 
up on MS2 and may get
+          * transaction ID 400 and update the same row.  Now the merge that 
happens to materialize the snapshot
+          * on read will thing the version of the row from transaction ID 500 
is the latest one.
+          *
+          * Longer term we can consider running Active-Passive MS (at least 
wrt to ACID operations).  This
+          * set could support a write-through cache for added performance.
+          */
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         // Make sure the user has not requested an insane amount of txns.
+         int maxTxns = MetastoreConf.getIntVar(conf, 
ConfVars.TXN_MAX_OPEN_BATCH);
+         if (numTxns > maxTxns) numTxns = maxTxns;
+ 
+         stmt = dbConn.createStatement();
+         List<Long> txnIds = openTxns(dbConn, stmt, rqst);
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+         return new OpenTxnsResponse(txnIds);
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
+         throw new MetaException("Unable to select from transaction database "
+           + StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       return openTxns(rqst);
+     }
+   }
+ 
+   private List<Long> openTxns(Connection dbConn, Statement stmt, 
OpenTxnRequest rqst)
+           throws SQLException, MetaException {
+     int numTxns = rqst.getNum_txns();
+     ResultSet rs = null;
+     TxnType txnType = TxnType.DEFAULT;
+     try {
+       if (rqst.isSetReplPolicy()) {
+         List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), 
rqst.getReplSrcTxnIds(), stmt);
+ 
+         if (!targetTxnIdList.isEmpty()) {
+           if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
+             LOG.warn("target txn id number " + targetTxnIdList.toString() +
+                     " is not matching with source txn id number " + 
rqst.getReplSrcTxnIds().toString());
+           }
+           LOG.info("Target transactions " + targetTxnIdList.toString() + " 
are present for repl policy :" +
+                   rqst.getReplPolicy() + " and Source transaction id : " + 
rqst.getReplSrcTxnIds().toString());
+           return targetTxnIdList;
+         }
+         txnType = TxnType.REPL_CREATED;
+       }
+ 
+       String s = sqlGenerator.addForUpdateClause("select ntxn_next from 
NEXT_TXN_ID");
+       LOG.debug("Going to execute query <" + s + ">");
+       rs = stmt.executeQuery(s);
+       if (!rs.next()) {
+         throw new MetaException("Transaction database not properly " +
+                 "configured, can't find next transaction id.");
+       }
+       long first = rs.getLong(1);
+       s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
+       LOG.debug("Going to execute update <" + s + ">");
+       stmt.executeUpdate(s);
+ 
+       long now = getDbTime(dbConn);
+       List<Long> txnIds = new ArrayList<>(numTxns);
+ 
+       List<String> rows = new ArrayList<>();
+       for (long i = first; i < first + numTxns; i++) {
+         txnIds.add(i);
+         rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
+                 + quoteString(rqst.getUser()) + "," + 
quoteString(rqst.getHostname()) + "," + txnType.getValue());
+       }
+       List<String> queries = sqlGenerator.createInsertValuesStmt(
+             "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, 
txn_user, txn_host, txn_type)", rows);
+       for (String q : queries) {
+         LOG.debug("Going to execute update <" + q + ">");
+         stmt.execute(q);
+       }
+ 
+       // Need to register minimum open txnid for current transactions into 
MIN_HISTORY table.
+       s = "select min(txn_id) from TXNS where txn_state = " + 
quoteChar(TXN_OPEN);
+       LOG.debug("Going to execute query <" + s + ">");
+       rs = stmt.executeQuery(s);
+       if (!rs.next()) {
+         throw new IllegalStateException("Scalar query returned no rows?!?!!");
+       }
+ 
+       // TXNS table should have atleast one entry because we just inserted 
the newly opened txns.
+       // So, min(txn_id) would be a non-zero txnid.
+       long minOpenTxnId = rs.getLong(1);
+       assert (minOpenTxnId > 0);
+       rows.clear();
+       for (long txnId = first; txnId < first + numTxns; txnId++) {
+         rows.add(txnId + ", " + minOpenTxnId);
+       }
+ 
+       // Insert transaction entries into MIN_HISTORY_LEVEL.
+       List<String> inserts = sqlGenerator.createInsertValuesStmt(
+               "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows);
+       for (String insert : inserts) {
+         LOG.debug("Going to execute insert <" + insert + ">");
+         stmt.execute(insert);
+       }
+       LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + 
txnIds
+               + ") with min_open_txn: " + minOpenTxnId);
+ 
+       if (rqst.isSetReplPolicy()) {
+         List<String> rowsRepl = new ArrayList<>();
+ 
+         for (int i = 0; i < numTxns; i++) {
+           rowsRepl.add(
+                   quoteString(rqst.getReplPolicy()) + "," + 
rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+         }
+ 
+         List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
+                 "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, 
RTM_TARGET_TXN_ID)", rowsRepl);
+ 
+         for (String query : queriesRepl) {
+           LOG.info("Going to execute insert <" + query + ">");
+           stmt.execute(query);
+         }
+       }
+ 
+       if (transactionalListeners != null) {
+         
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                 EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, 
null), dbConn, sqlGenerator);
+       }
+       return txnIds;
+     } finally {
+       close(rs);
+     }
+   }
+ 
+   private List<Long> getTargetTxnIdList(String replPolicy, List<Long> 
sourceTxnIdList, Statement stmt)
+           throws SQLException {
+     ResultSet rs = null;
+     try {
+       List<String> inQueries = new ArrayList<>();
+       StringBuilder prefix = new StringBuilder();
+       StringBuilder suffix = new StringBuilder();
+       List<Long> targetTxnIdList = new ArrayList<>();
+       prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
+       suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy));
+       TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, 
sourceTxnIdList,
+               "RTM_SRC_TXN_ID", false, false);
+       for (String query : inQueries) {
+         LOG.debug("Going to execute select <" + query + ">");
+         rs = stmt.executeQuery(query);
+         while (rs.next()) {
+           targetTxnIdList.add(rs.getLong(1));
+         }
+       }
+       LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " 
is " + targetTxnIdList.toString());
+       return targetTxnIdList;
+     }  catch (SQLException e) {
+       LOG.warn("failed to get target txn ids " + e.getMessage());
+       throw e;
+     } finally {
+       close(rs);
+     }
+   }
+ 
+   @Override
+   @RetrySemantics.Idempotent
+   public long getTargetTxnId(String replPolicy, long sourceTxnId) throws 
MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, 
Collections.singletonList(sourceTxnId), stmt);
+         if (targetTxnIds.isEmpty()) {
+           LOG.info("Txn {} not present for repl policy {}", sourceTxnId, 
replPolicy);
+           return -1;
+         }
+         assert (targetTxnIds.size() == 1);
+         return targetTxnIds.get(0);
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + 
sourceTxnId + ")");
+         throw new MetaException("Unable to get target transaction id "
+                 + StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       return getTargetTxnId(replPolicy, sourceTxnId);
+     }
+   }
+ 
+   @Override
+   @RetrySemantics.Idempotent
+   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, 
MetaException, TxnAbortedException {
+     long txnid = rqst.getTxnid();
+     long sourceTxnId = -1;
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         if (rqst.isSetReplPolicy()) {
+           sourceTxnId = rqst.getTxnid();
+           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
+                   Collections.singletonList(sourceTxnId), stmt);
+           if (targetTxnIds.isEmpty()) {
+             LOG.info("Target txn id is missing for source txn id : " + 
sourceTxnId +
+                     " and repl policy " + rqst.getReplPolicy());
+             return;
+           }
+           assert targetTxnIds.size() == 1;
+           txnid = targetTxnIds.get(0);
+         }
+ 
+         if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
+           TxnStatus status = findTxnState(txnid,stmt);
+           if(status == TxnStatus.ABORTED) {
+             if (rqst.isSetReplPolicy()) {
+               // in case of replication, idempotent is taken care by 
getTargetTxnId
+               LOG.warn("Invalid state ABORTED for transactions started using 
replication replay task");
+               String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + 
sourceTxnId +
+                       " and RTM_REPL_POLICY = " + 
quoteString(rqst.getReplPolicy());
+               LOG.info("Going to execute  <" + s + ">");
+               stmt.executeUpdate(s);
+             }
+             LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
+               ") requested by it is already " + TxnStatus.ABORTED);
+             return;
+           }
+           raiseTxnUnexpectedState(status, txnid);
+         }
+ 
+         if (rqst.isSetReplPolicy()) {
+           String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + 
sourceTxnId +
+               " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
+           LOG.info("Going to execute  <" + s + ">");
+           stmt.executeUpdate(s);
+         }
+ 
+         if (transactionalListeners != null) {
+           
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                   EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, 
null), dbConn, sqlGenerator);
+         }
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "abortTxn(" + rqst + ")");
+         throw new MetaException("Unable to update transaction database "
+           + StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       abortTxn(rqst);
+     }
+   }
+ 
+   @Override
+   @RetrySemantics.Idempotent
+   public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, 
MetaException {
+     List<Long> txnids = rqst.getTxn_ids();
+     try {
+       Connection dbConn = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         int numAborted = abortTxns(dbConn, txnids, false);
+         if (numAborted != txnids.size()) {
+           LOG.warn("Abort Transactions command only aborted " + numAborted + 
" out of " +
+               txnids.size() + " transactions. It's possible that the other " +
+               (txnids.size() - numAborted) +
+               " transactions have been aborted or committed, or the 
transaction ids are invalid.");
+         }
+ 
+         for (Long txnId : txnids) {
+           if (transactionalListeners != null) {
+             
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                     EventMessage.EventType.ABORT_TXN, new 
AbortTxnEvent(txnId, null), dbConn, sqlGenerator);
+           }
+         }
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "abortTxns(" + rqst + ")");
+         throw new MetaException("Unable to update transaction database "
+             + StringUtils.stringifyException(e));
+       } finally {
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       abortTxns(rqst);
+     }
+   }
+ 
+   /**
+    * Concurrency/isolation notes:
+    * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link 
#commitTxn(CommitTxnRequest)}
+    * operations using select4update on NEXT_TXN_ID.  Also, mutexes on TXNX 
table for specific txnid:X
+    * see more notes below.
+    * In order to prevent lost updates, we need to determine if any 2 
transactions overlap.  Each txn
+    * is viewed as an interval [M,N]. M is the txnid and N is taken from the 
same NEXT_TXN_ID sequence
+    * so that we can compare commit time of txn T with start time of txn S.  
This sequence can be thought of
+    * as a logical time counter.  If S.commitTime < T.startTime, T and S do 
NOT overlap.
+    *
+    * Motivating example:
+    * Suppose we have multi-statment transactions T and S both of which are 
attempting x = x + 1
+    * In order to prevent lost update problem, the the non-overlapping txns 
must lock in the snapshot
+    * that they read appropriately.  In particular, if txns do not overlap, 
then one follows the other
+    * (assumig they write the same entity), and thus the 2nd must see changes 
of the 1st.  We ensure
+    * this by locking in snapshot after 
+    * {@link #openTxns(OpenTxnRequest)} call is made (see 
org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn)
+    * and mutexing openTxn() with commit().  In other words, once a S.commit() 
starts we must ensure
+    * that txn T which will be considered a later txn, locks in a snapshot 
that includes the result
+    * of S's commit (assuming no other txns).
+    * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid 
means no other transactions
+    * were running in parallel).  If T and S both locked in the same snapshot 
(for example commit of
+    * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
+    * 'x' would be updated to the same value by both, i.e. lost update. 
+    */
+   @Override
+   @RetrySemantics.Idempotent("No-op if already committed")
+   public void commitTxn(CommitTxnRequest rqst)
+     throws NoSuchTxnException, TxnAbortedException, MetaException {
+     MaterializationsRebuildLockHandler materializationsRebuildLockHandler =
+         MaterializationsRebuildLockHandler.get();
+     List<TransactionRegistryInfo> txnComponents = new ArrayList<>();
+     boolean isUpdateDelete = false;
+     long txnid = rqst.getTxnid();
+     long sourceTxnId = -1;
+ 
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet lockHandle = null;
+       ResultSet commitIdRs = null, rs;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         if (rqst.isSetReplPolicy()) {
+           sourceTxnId = rqst.getTxnid();
+           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
+                   Collections.singletonList(sourceTxnId), stmt);
+           if (targetTxnIds.isEmpty()) {
+             LOG.info("Target txn id is missing for source txn id : " + 
sourceTxnId +
+                     " and repl policy " + rqst.getReplPolicy());
+             return;
+           }
+           assert targetTxnIds.size() == 1;
+           txnid = targetTxnIds.get(0);
+         }
+ 
+         /**
+          * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U 
ensures that no other
+          * operation can change this txn (such acquiring locks). While lock() 
and commitTxn()
+          * should not normally run concurrently (for same txn) but could due 
to bugs in the client
+          * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
+          */
+         lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+         if (lockHandle == null) {
+           //if here, txn was not found (in expected state)
+           TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
+           if(actualTxnStatus == TxnStatus.COMMITTED) {
+             if (rqst.isSetReplPolicy()) {
+               // in case of replication, idempotent is taken care by 
getTargetTxnId
+               LOG.warn("Invalid state COMMITTED for transactions started 
using replication replay task");
+             }
+             /**
+              * This makes the operation idempotent
+              * (assume that this is most likely due to retry logic)
+              */
+             LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") 
msg");
+             return;
+           }
+           raiseTxnUnexpectedState(actualTxnStatus, txnid);
+           shouldNeverHappen(txnid);
+           //dbConn is rolled back in finally{}
+         }
+ 
+         String conflictSQLSuffix = null;
+         if (rqst.isSetReplPolicy()) {
+           rs = null;
+         } else {
+           conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + 
" and tc_operation_type IN(" +
+                   quoteChar(OpertaionType.UPDATE.sqlConst) + "," + 
quoteChar(OpertaionType.DELETE.sqlConst) + ")";
+           rs = stmt.executeQuery(sqlGenerator.addLimitClause(1,
+                   "tc_operation_type " + conflictSQLSuffix));
+         }
+         if (rs != null && rs.next()) {
+           isUpdateDelete = true;
+           close(rs);
+           //if here it means currently committing txn performed update/delete 
and we should check WW conflict
+           /**
+            * This S4U will mutex with other commitTxn() and openTxns(). 
+            * -1 below makes txn intervals look like [3,3] [4,4] if all txns 
are serial
+            * Note: it's possible to have several txns have the same commit 
id.  Suppose 3 txns start
+            * at the same time and no new txns start until all 3 commit.
+            * We could've incremented the sequence for commitId is well but it 
doesn't add anything functionally.
+            */
+           commitIdRs = 
stmt.executeQuery(sqlGenerator.addForUpdateClause("select ntxn_next - 1 from 
NEXT_TXN_ID"));
+           if (!commitIdRs.next()) {
+             throw new IllegalStateException("No rows found in NEXT_TXN_ID");
+           }
+           long commitId = commitIdRs.getLong(1);
+           Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+           /**
+            * "select distinct" is used below because
+            * 1. once we get to multi-statement txns, we only care to record 
that something was updated once
+            * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is 
retried by caller it my create
+            *  duplicate entries in TXN_COMPONENTS
+            * but we want to add a PK on WRITE_SET which won't have unique 
rows w/o this distinct
+            * even if it includes all of it's columns
+            */
+           int numCompsWritten = stmt.executeUpdate(
+             "insert into WRITE_SET (ws_database, ws_table, ws_partition, 
ws_txnid, ws_commit_id, ws_operation_type)" +
+             " select distinct tc_database, tc_table, tc_partition, tc_txnid, 
" + commitId + ", tc_operation_type " + conflictSQLSuffix);
+           /**
+            * see if there are any overlapping txns wrote the same element, 
i.e. have a conflict
+            * Since entire commit operation is mutexed wrt other start/commit 
ops,
+            * committed.ws_commit_id <= current.ws_commit_id for all txns
+            * thus if committed.ws_commit_id < current.ws_txnid, transactions 
do NOT overlap
+            * For example, [17,20] is committed, [6,80] is being committed 
right now - these overlap
+            * [17,20] committed and [21,21] committing now - these do not 
overlap.
+            * [17,18] committed and [18,19] committing now - these overlap  
(here 18 started while 17 was still running)
+            */
+           rs = stmt.executeQuery
+             (sqlGenerator.addLimitClause(1, "committed.ws_txnid, 
committed.ws_commit_id, committed.ws_database," +
+               "committed.ws_table, committed.ws_partition, cur.ws_commit_id 
cur_ws_commit_id, " +
+               "cur.ws_operation_type cur_op, committed.ws_operation_type 
committed_op " +
+               "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
+               "ON committed.ws_database=cur.ws_database and 
committed.ws_table=cur.ws_table " +
+               //For partitioned table we always track writes at partition 
level (never at table)
+               //and for non partitioned - always at table level, thus the 
same table should never
+               //have entries with partition key and w/o
+               "and (committed.ws_partition=cur.ws_partition or 
(committed.ws_partition is null and cur.ws_partition is null)) " +
+               "where cur.ws_txnid <= committed.ws_commit_id" + //txns 
overlap; could replace ws_txnid
+               // with txnid, though any decent DB should infer this
+               " and cur.ws_txnid=" + txnid + //make sure RHS of join only has 
rows we just inserted as
+               // part of this commitTxn() op
+               " and committed.ws_txnid <> " + txnid + //and LHS only has 
committed txns
+               //U+U and U+D is a conflict but D+D is not and we don't 
currently track I in WRITE_SET at all
+               " and (committed.ws_operation_type=" + 
quoteChar(OpertaionType.UPDATE.sqlConst) +
+               " OR cur.ws_operation_type=" + 
quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
+           if (rs.next()) {
+             //found a conflict
+             String committedTxn = "[" + 
JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+             StringBuilder resource = new 
StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+             String partitionName = rs.getString(5);
+             if (partitionName != null) {
+               resource.append('/').append(partitionName);
+             }
+             String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," 
+ rs.getLong(6) + "]" + " due to a write conflict on " + resource +
+               " committed by " + committedTxn + " " + rs.getString(7) + "/" + 
rs.getString(8);
+             close(rs);
+             //remove WRITE_SET info for current txn since it's about to abort
+             dbConn.rollback(undoWriteSetForCurrentTxn);
+             LOG.info(msg);
+             //todo: should make abortTxns() write something into 
TXNS.TXN_META_INFO about this
+             if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 
1) {
+               throw new IllegalStateException(msg + " FAILED!");
+             }
+             dbConn.commit();
+             close(null, stmt, dbConn);
+             throw new TxnAbortedException(msg);
+           } else {
+             //no conflicting operations, proceed with the rest of commit 
sequence
+           }
+         }
+         else {
+           /**
+            * current txn didn't update/delete anything (may have inserted), 
so just proceed with commit
+            *
+            * We only care about commit id for write txns, so for RO (when 
supported) txns we don't
+            * have to mutex on NEXT_TXN_ID.
+            * Consider: if RO txn is after a W txn, then RO's openTxns() will 
be mutexed with W's
+            * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will 
see result of W txn.
+            * If RO < W, then there is no reads-from relationship.
+            * In replication flow we don't expect any write write conflict as 
it should have been handled at source.
+            */
+         }
+ 
+         String s;
+         if (!rqst.isSetReplPolicy()) {
+           // Move the record from txn_components into 
completed_txn_components so that the compactor
+           // knows where to look to compact.
+           s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, 
" +
+                   "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, 
tc_database, tc_table, " +
+                   "tc_partition, tc_writeid from TXN_COMPONENTS where 
tc_txnid = " + txnid;
+           LOG.debug("Going to execute insert <" + s + ">");
+ 
+           if ((stmt.executeUpdate(s)) < 1) {
+             //this can be reasonable for an empty txn START/COMMIT or 
read-only txn
+             //also an IUD with DP that didn't match any rows.
+             LOG.info("Expected to move at least one record from 
txn_components to " +
+                     "completed_txn_components when committing txn! " + 
JavaUtils.txnIdToString(txnid));
+           }
+         } else {
+           if (rqst.isSetWriteEventInfos()) {
+             List<String> rows = new ArrayList<>();
+             for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+               rows.add(txnid + "," + 
quoteString(writeEventInfo.getDatabase()) + "," +
+                       quoteString(writeEventInfo.getTable()) + "," +
+                       quoteString(writeEventInfo.getPartition()) + "," +
+                       writeEventInfo.getWriteId());
+             }
+             List<String> queries = 
sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
+                     "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, 
ctc_writeid)", rows);
+             for (String q : queries) {
+               LOG.debug("Going to execute insert  <" + q + "> ");
+               stmt.execute(q);
+             }
+           }
+ 
+           s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + 
sourceTxnId +
+                   " and RTM_REPL_POLICY = " + 
quoteString(rqst.getReplPolicy());
+           LOG.info("Repl going to execute  <" + s + ">");
+           stmt.executeUpdate(s);
+         }
+ 
+         // Obtain information that we need to update registry
+         s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from 
COMPLETED_TXN_COMPONENTS" +
+                 " where ctc_txnid = " + txnid;
+ 
+         LOG.debug("Going to extract table modification information for 
invalidation cache <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           // We only enter in this loop if the transaction actually affected 
any table
+           txnComponents.add(new TransactionRegistryInfo(rs.getString(1), 
rs.getString(2),
+               rs.getLong(3), rs.getTimestamp(4, 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()));
+         }
+ 
+         // cleanup all txn related metadata
+         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
+         LOG.debug("Going to execute update <" + s + ">");
+         stmt.executeUpdate(s);
+         s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
+         LOG.debug("Going to execute update <" + s + ">");
+         stmt.executeUpdate(s);
+         s = "delete from TXNS where txn_id = " + txnid;
+         LOG.debug("Going to execute update <" + s + ">");
+         stmt.executeUpdate(s);
+         s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid;
+         LOG.debug("Going to execute update <" + s + ">");
+         stmt.executeUpdate(s);
+         LOG.info("Removed committed transaction: (" + txnid + ") from 
MIN_HISTORY_LEVEL");
+         if (transactionalListeners != null) {
+           
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                   EventMessage.EventType.COMMIT_TXN, new 
CommitTxnEvent(txnid, null), dbConn, sqlGenerator);
+         }
+ 
+         MaterializationsInvalidationCache materializationsInvalidationCache =
+             MaterializationsInvalidationCache.get();
+         for (TransactionRegistryInfo info : txnComponents) {
+           if 
(materializationsInvalidationCache.containsMaterialization(info.dbName, 
info.tblName) &&
+               
!materializationsRebuildLockHandler.readyToCommitResource(info.dbName, 
info.tblName, txnid)) {
+             throw new MetaException(
+                 "Another process is rebuilding the materialized view " + 
info.fullyQualifiedName);
+           }
+         }
+         LOG.debug("Going to commit");
+         close(rs);
+         dbConn.commit();
+ 
+         // Update registry with modifications
+         for (TransactionRegistryInfo info : txnComponents) {
+           materializationsInvalidationCache.notifyTableModification(
+               info.dbName, info.tblName, info.writeId, info.timestamp, 
isUpdateDelete);
+         }
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "commitTxn(" + rqst + ")");
+         throw new MetaException("Unable to update transaction database "
+           + StringUtils.stringifyException(e));
+       } finally {
+         close(commitIdRs);
+         close(lockHandle, stmt, dbConn);
+         unlockInternal();
+         for (TransactionRegistryInfo info : txnComponents) {
+           materializationsRebuildLockHandler.unlockResource(info.dbName, 
info.tblName, txnid);
+         }
+       }
+     } catch (RetryException e) {
+       commitTxn(rqst);
+     }
+   }
+ 
+   /**
+    * Replicate Table Write Ids state to mark aborted write ids and writeid 
high water mark.
+    * @param rqst info on table/partitions and writeid snapshot to replicate.
+    * @throws MetaException
+    */
+   @Override
+   @RetrySemantics.Idempotent("No-op if already replicated the writeid state")
+   public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws 
MetaException {
+     String dbName = rqst.getDbName().toLowerCase();
+     String tblName = rqst.getTableName().toLowerCase();
+     ValidWriteIdList validWriteIdList = new 
ValidReaderWriteIdList(rqst.getValidWriteIdlist());
+ 
+     // Get the abortedWriteIds which are already sorted in ascending order.
+     List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList);
+     int numAbortedWrites = abortedWriteIds.size();
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+       TxnStore.MutexAPI.LockHandle handle = null;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         // Check if this txn state is already replicated for this given 
table. If yes, then it is
+         // idempotent case and just return.
+         String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = 
" + quoteString(dbName)
+                         + " and nwi_table = " + quoteString(tblName);
+         LOG.debug("Going to execute query <" + sql + ">");
+ 
+         rs = stmt.executeQuery(sql);
+         if (rs.next()) {
+           LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> 
is already applied for the table: "
+                   + dbName + "." + tblName);
+           rollbackDBConn(dbConn);
+           return;
+         }
+ 
+         if (numAbortedWrites > 0) {
+           // Allocate/Map one txn per aborted writeId and abort the txn to 
mark writeid as aborted.
+           List<Long> txnIds = openTxns(dbConn, stmt,
+                   new OpenTxnRequest(numAbortedWrites, rqst.getUser(), 
rqst.getHostName()));
+           assert(numAbortedWrites == txnIds.size());
+ 
+           // Map each aborted write id with each allocated txn.
+           List<String> rows = new ArrayList<>();
+           int i = 0;
+           for (long txn : txnIds) {
+             long writeId = abortedWriteIds.get(i++);
+             rows.add(txn + ", " + quoteString(dbName) + ", " + 
quoteString(tblName) + ", " + writeId);
+             LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
+           }
+ 
+           // Insert entries to TXN_TO_WRITE_ID for aborted write ids
+           List<String> inserts = sqlGenerator.createInsertValuesStmt(
+                   "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, 
t2w_writeid)", rows);
+           for (String insert : inserts) {
+             LOG.debug("Going to execute insert <" + insert + ">");
+             stmt.execute(insert);
+           }
+ 
+           // Abort all the allocated txns so that the mapped write ids are 
referred as aborted ones.
+           int numAborts = abortTxns(dbConn, txnIds, true);
+           assert(numAborts == numAbortedWrites);
+         }
+         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+ 
+         // There are some txns in the list which has no write id allocated 
and hence go ahead and do it.
+         // Get the next write id for the given table and update it with new 
next write id.
+         // It is expected NEXT_WRITE_ID doesn't have entry for this table and 
hence directly insert it.
+         long nextWriteId = validWriteIdList.getHighWatermark() + 1;
+ 
+         // First allocation of write id (hwm+1) should add the table to the 
next_write_id meta table.
+         sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) 
values ("
+                 + quoteString(dbName) + "," + quoteString(tblName) + ","
+                 + Long.toString(nextWriteId) + ")";
+         LOG.debug("Going to execute insert <" + sql + ">");
+         stmt.execute(sql);
+ 
+         LOG.info("WriteId state <" + validWriteIdList + "> is applied for the 
table: " + dbName + "." + tblName);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")");
+         throw new MetaException("Unable to update transaction database "
+                 + StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+         if(handle != null) {
+           handle.releaseLocks();
+         }
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       replTableWriteIdState(rqst);
+     }
+ 
+     // Schedule Major compaction on all the partitions/table to clean aborted 
data
+     if (numAbortedWrites > 0) {
+       CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), 
rqst.getTableName(),
+               CompactionType.MAJOR);
+       if (rqst.isSetPartNames()) {
+         for (String partName : rqst.getPartNames()) {
+           compactRqst.setPartitionname(partName);
+           compact(compactRqst);
+         }
+       } else {
+         compact(compactRqst);
+       }
+     }
+   }
+ 
+   private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) {
+     List<Long> abortedWriteIds = new ArrayList<>();
+     for (long writeId : validWriteIdList.getInvalidWriteIds()) {
+       if (validWriteIdList.isWriteIdAborted(writeId)) {
+         abortedWriteIds.add(writeId);
+       }
+     }
+     return abortedWriteIds;
+   }
+ 
+   @Override
+   @RetrySemantics.ReadOnly
+   public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest 
rqst)
+           throws NoSuchTxnException, MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ValidTxnList validTxnList;
+ 
+       // We should prepare the valid write ids list based on validTxnList of 
current txn.
+       // If no txn exists in the caller, then they would pass null for 
validTxnList and so it is
+       // required to get the current state of txns to make validTxnList
+       if (rqst.isSetValidTxnList()) {
+         validTxnList = new ValidReadTxnList(rqst.getValidTxnList());
+       } else {
+         // Passing 0 for currentTxn means, this validTxnList is not wrt to 
any txn
+         validTxnList = TxnUtils.createValidReadTxnList(getOpenTxns(), 0);
+       }
+       try {
+         /**
+          * This runs at READ_COMMITTED for exactly the same reason as {@link 
#getOpenTxnsInfo()}
+          */
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         // Get the valid write id list for all the tables read by the current 
txn
+         List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
+         for (String fullTableName : rqst.getFullTableNames()) {
+           tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, 
fullTableName, validTxnList));
+         }
+ 
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+         GetValidWriteIdsResponse owr = new 
GetValidWriteIdsResponse(tblValidWriteIdsList);
+         return owr;
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "getValidWriteIds");
+         throw new MetaException("Unable to select from transaction database, "
+                 + StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return getValidWriteIds(rqst);
+     }
+   }
+ 
+   // Method to get the Valid write ids list for the given table
+   // Input fullTableName is expected to be of format <db_name>.<table_name>
+   private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String 
fullTableName,
+                                                ValidTxnList validTxnList) 
throws SQLException {
+     ResultSet rs = null;
+     String[] names = TxnUtils.getDbTableName(fullTableName);
+     try {
+       // Need to initialize to 0 to make sure if nobody modified this table, 
then current txn
+       // shouldn't read any data.
+       // If there is a conversion from non-acid to acid table, then by 
default 0 would be assigned as
+       // writeId for data from non-acid table and so writeIdHwm=0 would 
ensure those data are readable by any txns.
+       long writeIdHwm = 0;
+       List<Long> invalidWriteIdList = new ArrayList<>();
+       long minOpenWriteId = Long.MAX_VALUE;
+       BitSet abortedBits = new BitSet();
+       long txnHwm = validTxnList.getHighWatermark();
+ 
+       // Find the writeId high water mark based upon txnId high water mark. 
If found, then, need to
+       // traverse through all write Ids less than writeId HWM to make 
exceptions list.
+       // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, 
max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm))
+       String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where 
t2w_txnid <= " + txnHwm
+               + " and t2w_database = " + quoteString(names[0])
+               + " and t2w_table = " + quoteString(names[1]);
+       LOG.debug("Going to execute query<" + s + ">");
+       rs = stmt.executeQuery(s);
+       if (rs.next()) {
+         writeIdHwm = rs.getLong(1);
+       }
+ 
+       // If no writeIds allocated by txns under txnHwm, then find writeHwm 
from NEXT_WRITE_ID.
+       if (writeIdHwm <= 0) {
+         // Need to subtract 1 as nwi_next would be the next write id to be 
allocated but we need highest
+         // allocated write id.
+         s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + 
quoteString(names[0])
+                 + " and nwi_table = " + quoteString(names[1]);
+         LOG.debug("Going to execute query<" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (rs.next()) {
+           long maxWriteId = rs.getLong(1);
+           if (maxWriteId > 0) {
+             writeIdHwm = (writeIdHwm > 0) ? Math.min(maxWriteId, writeIdHwm) 
: maxWriteId;
+           }
+         }
+       }
+ 
+       // As writeIdHwm is known, query all writeIds under the writeId HWM.
+       // If any writeId under HWM is allocated by txn > txnId HWM or belongs 
to open/aborted txns,
+       // then will be added to invalid list. The results should be sorted in 
ascending order based
+       // on write id. The sorting is needed as exceptions list in 
ValidWriteIdList would be looked-up
+       // using binary search.
+       s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where 
t2w_writeid <= " + writeIdHwm
+               + " and t2w_database = " + quoteString(names[0])
+               + " and t2w_table = " + quoteString(names[1])
+               + " order by t2w_writeid asc";
+ 
+       LOG.debug("Going to execute query<" + s + ">");
+       rs = stmt.executeQuery(s);
+       while (rs.next()) {
+         long txnId = rs.getLong(1);
+         long writeId = rs.getLong(2);
+         if (validTxnList.isTxnValid(txnId)) {
+           // Skip if the transaction under evaluation is already committed.
+           continue;
+         }
+ 
+         // The current txn is either in open or aborted state.
+         // Mark the write ids state as per the txn state.
+         invalidWriteIdList.add(writeId);
+         if (validTxnList.isTxnAborted(txnId)) {
+           abortedBits.set(invalidWriteIdList.size() - 1);
+         } else {
+           minOpenWriteId = Math.min(minOpenWriteId, writeId);
+         }
+       }
+ 
+       ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+       TableValidWriteIds owi = new TableValidWriteIds(fullTableName, 
writeIdHwm, invalidWriteIdList, byteBuffer);
+       if (minOpenWriteId < Long.MAX_VALUE) {
+         owi.setMinOpenWriteId(minOpenWriteId);
+       }
+       return owi;
+     } finally {
+       close(rs);
+     }
+   }
+ 
+   @Override
+   @RetrySemantics.Idempotent
+   public AllocateTableWriteIdsResponse 
allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+           throws NoSuchTxnException, TxnAbortedException, MetaException {
+     List<Long> txnIds;
+     String dbName = rqst.getDbName().toLowerCase();
+     String tblName = rqst.getTableName().toLowerCase();
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+       TxnStore.MutexAPI.LockHandle handle = null;
+       List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
+       List<TxnToWriteId> srcTxnToWriteIds = null;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         if (rqst.isSetReplPolicy()) {
+           srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList();
+           List<Long> srcTxnIds = new ArrayList<>();
+           assert (rqst.isSetSrcTxnToWriteIdList());
+           assert (!rqst.isSetTxnIds());
+           assert (!srcTxnToWriteIds.isEmpty());
+ 
+           for (TxnToWriteId txnToWriteId :  srcTxnToWriteIds) {
+             srcTxnIds.add(txnToWriteId.getTxnId());
+           }
+           txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
+           if (srcTxnIds.size() != txnIds.size()) {
+             LOG.warn("Target txn id is missing for source txn id : " + 
srcTxnIds.toString() +
+                     " and repl policy " + rqst.getReplPolicy());
+             throw new RuntimeException("This should never happen for txnIds: 
" + txnIds);
+           }
+         } else {
+           assert (!rqst.isSetSrcTxnToWriteIdList());
+           assert (rqst.isSetTxnIds());
+           txnIds = rqst.getTxnIds();
+         }
+ 
+         Collections.sort(txnIds); //easier to read logs and for assumption 
done in replication flow
+ 
+         // Check if all the input txns are in open state. Write ID should be 
allocated only for open transactions.
+         if (!isTxnsInOpenState(txnIds, stmt)) {
+           ensureAllTxnsValid(dbName, tblName, txnIds, stmt);
+           throw new RuntimeException("This should never happen for txnIds: " 
+ txnIds);
+         }
+ 
+         long writeId;
+         String s;
+         long allocatedTxnsCount = 0;
+         long txnId;
+         List<String> queries = new ArrayList<>();
+         StringBuilder prefix = new StringBuilder();
+         StringBuilder suffix = new StringBuilder();
+ 
+         // Traverse the TXN_TO_WRITE_ID to see if any of the input txns 
already have allocated a
+         // write id for the same db.table. If yes, then need to reuse it else 
have to allocate new one
+         // The write id would have been already allocated in case of 
multi-statement txns where
+         // first write on a table will allocate write id and rest of the 
writes should re-use it.
+         prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID 
where"
+                         + " t2w_database = " + quoteString(dbName)
+                         + " and t2w_table = " + quoteString(tblName) + " and 
");
+         suffix.append("");
+         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+                 txnIds, "t2w_txnid", false, false);
+         for (String query : queries) {
+           LOG.debug("Going to execute query <" + query + ">");
+           rs = stmt.executeQuery(query);
+           while (rs.next()) {
+             // If table write ID is already allocated for the given 
transaction, then just use it
+             txnId = rs.getLong(1);
+             writeId = rs.getLong(2);
+             txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+             allocatedTxnsCount++;
+             LOG.info("Reused already allocated writeID: " + writeId + " for 
txnId: " + txnId);
+           }
+         }
+ 
+         // Batch allocation should always happen atomically. Either write ids 
for all txns is allocated or none.
+         long numOfWriteIds = txnIds.size();
+         assert ((allocatedTxnsCount == 0) || (numOfWriteIds == 
allocatedTxnsCount));
+         if (allocatedTxnsCount == numOfWriteIds) {
+           // If all the txns in the list have pre-allocated write ids for the 
given table, then just return.
+           // This is for idempotent case.
+           return new AllocateTableWriteIdsResponse(txnToWriteIds);
+         }
+ 
+         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+ 
+         // There are some txns in the list which does not have write id 
allocated and hence go ahead and do it.
+         // Get the next write id for the given table and update it with new 
next write id.
+         // This is select for update query which takes a lock if the table 
entry is already there in NEXT_WRITE_ID
+         s = sqlGenerator.addForUpdateClause(
+                 "select nwi_next from NEXT_WRITE_ID where nwi_database = " + 
quoteString(dbName)
+                         + " and nwi_table = " + quoteString(tblName));
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           // First allocation of write id should add the table to the 
next_write_id meta table
+           // The initial value for write id should be 1 and hence we add 1 
with number of write ids allocated here
+           writeId = 1;
+           s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) 
values ("
+                   + quoteString(dbName) + "," + quoteString(tblName) + "," + 
Long.toString(numOfWriteIds + 1) + ")";
+           LOG.debug("Going to execute insert <" + s + ">");
+           stmt.execute(s);
+         } else {
+           writeId = rs.getLong(1);
+           // Update the NEXT_WRITE_ID for the given table after incrementing 
by number of write ids allocated
+           s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + 
numOfWriteIds)
+                   + " where nwi_database = " + quoteString(dbName)
+                   + " and nwi_table = " + quoteString(tblName);
+           LOG.debug("Going to execute update <" + s + ">");
+           stmt.executeUpdate(s);
+         }
+ 
+         // Map the newly allocated write ids against the list of txns which 
doesn't have pre-allocated
+         // write ids
+         List<String> rows = new ArrayList<>();
+         for (long txn : txnIds) {
+           rows.add(txn + ", " + quoteString(dbName) + ", " + 
quoteString(tblName) + ", " + writeId);
+           txnToWriteIds.add(new TxnToWriteId(txn, writeId));
+           LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
+           writeId++;
+         }
+ 
+         if (rqst.isSetReplPolicy()) {
+           int lastIdx = txnToWriteIds.size()-1;
+           if ((txnToWriteIds.get(0).getWriteId() != 
srcTxnToWriteIds.get(0).getWriteId()) ||
+               (txnToWriteIds.get(lastIdx).getWriteId() != 
srcTxnToWriteIds.get(lastIdx).getWriteId())) {
+             LOG.error("Allocated write id range {} is not matching with the 
input write id range {}.",
+                     txnToWriteIds, srcTxnToWriteIds);
+             throw new IllegalStateException("Write id allocation failed for: 
" + srcTxnToWriteIds);
+           }
+         }
+ 
+         // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
+         List<String> inserts = sqlGenerator.createInsertValuesStmt(
+                 "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, 
t2w_writeid)", rows);
+         for (String insert : inserts) {
+           LOG.debug("Going to execute insert <" + insert + ">");
+           stmt.execute(insert);
+         }
+ 
+         if (transactionalListeners != null) {
+           
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                   EventMessage.EventType.ALLOC_WRITE_ID,
+                   new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), 
rqst.getTableName(), null),
+                   dbConn, sqlGenerator);
+         }
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+         return new AllocateTableWriteIdsResponse(txnToWriteIds);
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")");
+         throw new MetaException("Unable to update transaction database "
+                 + StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+         if(handle != null) {
+           handle.releaseLocks();
+         }
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       return allocateTableWriteIds(rqst);
+     }
+   }
+   @Override
+   public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst)
+       throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       TxnStore.MutexAPI.LockHandle handle = null;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+         //since this is on conversion from non-acid to acid, NEXT_WRITE_ID 
should not have an entry
+         //for this table.  It also has a unique index in case 'should not' is 
violated
+ 
+         // First allocation of write id should add the table to the 
next_write_id meta table
+         // The initial value for write id should be 1 and hence we add 1 with 
number of write ids
+         // allocated here
+         String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, 
nwi_next) values ("
+             + quoteString(rqst.getDbName()) + "," + 
quoteString(rqst.getTblName()) + "," +
+             Long.toString(rqst.getSeeWriteId() + 1) + ")";
+         LOG.debug("Going to execute insert <" + s + ">");
+         stmt.execute(s);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "seedWriteIdOnAcidConversion(" + rqst + 
")");
+         throw new MetaException("Unable to update transaction database "
+             + StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+         if(handle != null) {
+           handle.releaseLocks();
+         }
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       seedWriteIdOnAcidConversion(rqst);
+     }
+ 
+   }
+   @Override
+   @RetrySemantics.Idempotent
+   public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent)
+           throws MetaException {
+     Connection dbConn = null;
+     try {
+       try {
+         //Idempotent case is handled by notify Event
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                 EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, 
sqlGenerator);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+         return;
+       } catch (SQLException e) {
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         if (isDuplicateKeyError(e)) {
+           // in case of key duplicate error, retry as it might be because of 
race condition
+           if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", 
e.getMessage())) {
+             throw new RetryException();
+           }
+           retryNum = 0;
+           throw new MetaException(e.getMessage());
+         }
+         checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent 
+ ")");
+         throw new MetaException("Unable to add write notification event " + 
StringUtils.stringifyException(e));
+       } finally{
+         closeDbConn(dbConn);
+         unlockInternal();
+       }
+     } catch (RetryException e) {
+       addWriteNotificationLog(acidWriteEvent);
+     }
+   }
+ 
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void performWriteSetGC() {
+     Connection dbConn = null;
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+       stmt = dbConn.createStatement();
+       rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+       if(!rs.next()) {
+         throw new IllegalStateException("NEXT_TXN_ID is empty: DB is 
corrupted");
+       }
+       long highestAllocatedTxnId = rs.getLong(1);
+       close(rs);
+       rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" 
+ quoteChar(TXN_OPEN));
+       if(!rs.next()) {
+         throw new IllegalStateException("Scalar query returned no rows?!?!!");
+       }
+       long commitHighWaterMark;//all currently open txns (if any) have txnid 
>= than commitHighWaterMark
+       long lowestOpenTxnId = rs.getLong(1);
+       if(rs.wasNull()) {
+         //if here then there are no Open txns and  highestAllocatedTxnId must 
be
+         //resolved (i.e. committed or aborted), either way
+         //there are no open txns with id <= highestAllocatedTxnId
+         //the +1 is there because "delete ..." below has < (which is correct 
for the case when
+         //there is an open txn
+         //Concurrency: even if new txn starts (or starts + commits) it is 
still true that
+         //there are no currently open txns that overlap with any committed 
txn with 
+         //commitId <= commitHighWaterMark (as set on next line).  So plain 
READ_COMMITTED is enough.
+         commitHighWaterMark = highestAllocatedTxnId + 1;
+       }
+       else {
+         commitHighWaterMark = lowestOpenTxnId;
+       }
+       int delCnt = stmt.executeUpdate("delete from WRITE_SET where 
ws_commit_id < " + commitHighWaterMark);
+       LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+       dbConn.commit();
+     } catch (SQLException ex) {
+       LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
+     }
+     finally {
+       close(rs, stmt, dbConn);
+     }
+   }
+ 
+   /**
+    * Gets the information of the first transaction for the given table
+    * after the transaction with the input id was committed (if any). 
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
+       String inputDbName, String inputTableName, ValidWriteIdList txnList)
+           throws MetaException {
+     final List<Long> openTxns = 
Arrays.asList(ArrayUtils.toObject(txnList.getInvalidWriteIds()));
+ 
+     Connection dbConn = null;
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+       stmt = dbConn.createStatement();
+       stmt.setMaxRows(1);
+       String s = "select ctc_timestamp, ctc_writeid, ctc_database, ctc_table "
+           + "from COMPLETED_TXN_COMPONENTS "
+           + "where ctc_database=" + quoteString(inputDbName) + " and 
ctc_table=" + quoteString(inputTableName)
+           + " and ctc_writeid > " + txnList.getHighWatermark()
+           + (txnList.getInvalidWriteIds().length == 0 ?
+               " " : " or ctc_writeid IN(" + StringUtils.join(",", openTxns) + 
") ")
+           + "order by ctc_timestamp asc";
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("Going to execute query <" + s + ">");
+       }
+       rs = stmt.executeQuery(s);
+ 
+       if(!rs.next()) {
+         return new BasicTxnInfo(true);
+       }
+       final BasicTxnInfo txnInfo = new BasicTxnInfo(false);
+       txnInfo.setTime(rs.getTimestamp(1, 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime());
+       txnInfo.setTxnid(rs.getLong(2));
+       txnInfo.setDbname(rs.getString(3));
+       txnInfo.setTablename(rs.getString(4));
+       return txnInfo;
+     } catch (SQLException ex) {
+       LOG.warn("getLastCompletedTransactionForTable failed due to " + 
getMessage(ex), ex);
+       throw new MetaException("Unable to retrieve commits information due to 
" + StringUtils.stringifyException(ex));
+     } finally {
+       close(rs, stmt, dbConn);
+     }
+   }
+ 
+   /**
+    * As much as possible (i.e. in absence of retries) we want both operations 
to be done on the same
+    * connection (but separate transactions).  This avoid some flakiness in 
BONECP where if you
+    * perform an operation on 1 connection and immediately get another from 
the pool, the 2nd one
+    * doesn't see results of the first.
+    * 
+    * Retry-by-caller note: If the call to lock is from a transaction, then in 
the worst case
+    * there will be a duplicate set of locks but both sets will belong to the 
same txn so they 
+    * will not conflict with each other.  For locks w/o txn context (i.e. 
read-only query), this
+    * may lead to deadlock (at least a long wait).  (e.g. 1st call creates 
locks in {@code LOCK_WAITING}
+    * mode and response gets lost.  Then {@link 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
+    * retries, and enqueues another set of locks in LOCK_WAITING.  The 2nd 
LockResponse is delivered
+    * to the DbLockManager, which will keep dong {@link 
#checkLock(CheckLockRequest)} until the 1st
+    * set of locks times out.
+    */
+   @RetrySemantics.CannotRetry
+   public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, 
TxnAbortedException, MetaException {
+     ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
+     try {
+       return checkLockWithRetry(connAndLockId.dbConn, 
connAndLockId.extLockId, rqst.getTxnid());
+     }
+     catch(NoSuchLockException e) {
+       // This should never happen, as we just added the lock id
+       throw new MetaException("Couldn't find a lock we just created! " + 
e.getMessage());
+     }
+   }
+   private static final class ConnectionLockIdPair {
+     private final Connection dbConn;
+     private final long extLockId;
+     private ConnectionLockIdPair(Connection dbConn, long extLockId) {
+       this.dbConn = dbConn;
+       this.extLockId = extLockId;
+     }
+   }
+ 
+   /**
+    * Note that by definition select for update is divorced from update, i.e. 
you executeQuery() to read
+    * and then executeUpdate().  One other alternative would be to actually 
update the row in TXNS but
+    * to the same value as before thus forcing db to acquire write lock for 
duration of the transaction.
+    *
+    * There is no real reason to return the ResultSet here other than to make 
sure the reference to it
+    * is retained for duration of intended lock scope and is not GC'd thus 
(unlikely) causing lock
+    * to be released.
+    * @param txnState the state this txn is expected to be in.  may be null
+    * @return null if no row was found
+    * @throws SQLException
+    * @throws MetaException
+    */
+   private ResultSet lockTransactionRecord(Statement stmt, long txnId, 
Character txnState) throws SQLException, MetaException {
+     String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + 
(txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
+     ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query));
+     if(rs.next()) {
+       return rs;
+     }
+     close(rs);
+     return null;
+   }
+ 
+   /**
+    * This enters locks into the queue in {@link #LOCK_WAITING} mode.
+    *
+    * Isolation Level Notes:
+    * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. 
 This serializes
+    * any 2 {@code enqueueLockWithRetry()} calls.
+    * 2. We use S4U on the relevant TXNS row to block any concurrent 
abort/commit/etc operations
+    * @see #checkLockWithRetry(Connection, long, long)
+    */
+   private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws 
NoSuchTxnException, TxnAbortedException, MetaException {
+     boolean success = false;
+     Connection dbConn = null;
+     try {
+       Statement stmt = null;
+       ResultSet rs = null;
+       ResultSet lockHandle = null;
+       try {
+         lockInternal();
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         long txnid = rqst.getTxnid();
+         stmt = dbConn.createStatement();
+         if (isValidTxn(txnid)) {
+           //this also ensures that txn is still there in expected state
+           lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+           if(lockHandle == null) {
+             ensureValidTxn(dbConn, txnid, stmt);
+             shouldNeverHappen(txnid);
+           }
+         }
+         /** Get the next lock id.
+          * This has to be atomic with adding entries to HIVE_LOCK entries 
(1st add in W state) to prevent a race.
+          * Suppose ID gen is a separate txn and 2 concurrent lock() methods 
are running.  1st one generates nl_next=7,
+          * 2nd nl_next=8.  Then 8 goes first to insert into HIVE_LOCKS and 
acquires the locks.  Then 7 unblocks,
+          * and add it's W locks but it won't see locks from 8 since to be 
'fair' {@link #checkLock(java.sql.Connection, long)}
+          * doesn't block on locks acquired later than one it's checking*/
+         String s = sqlGenerator.addForUpdateClause("select nl_next from 
NEXT_LOCK_ID");
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+           throw new MetaException("Transaction tables not properly " +
+             "initialized, no record found in next_lock_id");
+         }
+         long extLockId = rs.getLong(1);
+         s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+         LOG.debug("Going to execute update <" + s + ">");
+         stmt.executeUpdate(s);
+ 
+         if (txnid > 0) {
+           List<String> rows = new ArrayList<>();
+           // For each component in this lock request,
+           // add an entry to the txn_components table
+           for (LockComponent lc : rqst.getComponent()) {
+             if(lc.isSetIsTransactional() && !lc.isIsTransactional()) {
+               //we don't prevent using non-acid resources in a txn but we do 
lock them
+               continue;
+             }
+             boolean updateTxnComponents;
+             if(!lc.isSetOperationType()) {
+               //request came from old version of the client
+               updateTxnComponents = true;//this matches old behavior
+             }
+             else {


<TRUNCATED>

Reply via email to