This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a67485b  HIVE-24125: Incorrect transaction snapshot invalidation with 
unnecessary writeset check for exclusive operations (Denys Kuzmenko, reviewed 
by Peter Varga and Peter Vary)
a67485b is described below

commit a67485bf9d0e1bbc9660226dba3c6ee89ab832fd
Author: Denys Kuzmenko <dkuzme...@cloudera.com>
AuthorDate: Wed Sep 9 12:01:51 2020 +0200

    HIVE-24125: Incorrect transaction snapshot invalidation with unnecessary 
writeset check for exclusive operations (Denys Kuzmenko, reviewed by Peter 
Varga and Peter Vary)
    
    Revert "HIVE-23725: ValidTxnManager snapshot outdating causing partial 
reads" (reviewed by Zoltan Haindrich and Peter Varga)
    This reverts commit e2a02f1b43cba657d4d1c16ead091072be5fe834.
    
    HIVE-24125: Incorrect transaction snapshot invalidation with unnecessary 
writeset check for exclusive operations.
    
    Closes (#1474)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  11 +-
 .../java/org/apache/hadoop/hive/ql/Compiler.java   |   1 +
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |  69 ++++++++++--
 .../org/apache/hadoop/hive/ql/DriverContext.java   |  18 ++++
 .../org/apache/hadoop/hive/ql/DriverFactory.java   |   3 -
 .../apache/hadoop/hive/ql/DriverTxnHandler.java    |  29 +-----
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       |   6 +-
 .../hadoop/hive/ql/reexec/IReExecutionPlugin.java  |   3 +-
 .../apache/hadoop/hive/ql/reexec/ReExecDriver.java |  14 ++-
 .../hive/ql/reexec/ReExecuteLostAMQueryPlugin.java |   8 +-
 .../hive/ql/reexec/ReExecutionDagSubmitPlugin.java |  11 +-
 .../hive/ql/reexec/ReExecutionOverlayPlugin.java   |   4 +-
 .../hive/ql/reexec/ReExecutionRetryLockPlugin.java |  59 -----------
 .../hadoop/hive/ql/reexec/ReOptimizePlugin.java    |   8 +-
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java |   3 +-
 .../apache/hadoop/hive/ql/TestTxnCommands3.java    |   2 +
 .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java   |   7 +-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 100 ++++++++++++++----
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp    |  24 ++++-
 .../src/gen/thrift/gen-cpp/hive_metastore_types.h  |  12 ++-
 .../hive/metastore/api/CommitTxnRequest.java       | 116 +++++++++++++++++++--
 .../thrift/gen-php/metastore/CommitTxnRequest.php  |  26 ++++-
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |  18 +++-
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |   4 +-
 .../hadoop/hive/metastore/HiveMetaStoreClient.java |   2 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java    |   2 +-
 .../src/main/thrift/hive_metastore.thrift          |   3 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 104 +++++++++---------
 .../metastore/HiveMetaStoreClientPreCatalog.java   |   2 +-
 29 files changed, 432 insertions(+), 237 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f4712d3..06fff9d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5163,8 +5163,7 @@ public class HiveConf extends Configuration {
         "comma separated list of plugin can be used:\n"
             + "  overlay: hiveconf subtree 'reexec.overlay' is used as an 
overlay in case of an execution errors out\n"
             + "  reoptimize: collects operator statistics during execution and 
recompile the query after a failure\n"
-            + "  reexecute_lost_am: reexecutes query if it failed due to tez 
am node gets decommissioned\n"
-            + "  The retrylock strategy is always enabled: recompiles the 
query if snapshot becomes outdated before lock acquisition"),
+            + "  reexecute_lost_am: reexecutes query if it failed due to tez 
am node gets decommissioned"),
     
HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope",
 "metastore",
         new StringSet("query", "hiveserver", "metastore"),
         "Sets the persistence scope of runtime statistics\n"
@@ -5172,13 +5171,11 @@ public class HiveConf extends Configuration {
             + "  hiveserver: runtime statistics are persisted in the 
hiveserver - all sessions share it\n"
             + "  metastore: runtime statistics are persisted in the metastore 
as well"),
 
+    HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT("hive.txn.retrysnapshot.max.count", 5, 
new RangeValidator(1, 20),
+        "Maximum number of snapshot invalidate attempts per request."),
 
     HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1,
-        "Maximum number of re-executions for a single query."
-            + " The maximum re-execution retry is limited at 10"),
-    
HIVE_QUERY_MAX_REEXECUTION_RETRYLOCK_COUNT("hive.query.reexecution.retrylock.max.count",
 5,
-        "Maximum number of re-executions with retrylock strategy for a single 
query."
-            + " The maximum re-execution retry is limited at 10"),
+        "Maximum number of re-executions for a single query."),
     
HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats",
 false,
         "If sessionstats are enabled; this option can be used to collect 
statistics all the time"),
     
HIVE_QUERY_REEXECUTION_STATS_CACHE_BATCH_SIZE("hive.query.reexecution.stats.cache.batch.size",
 -1,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
index c51a146..b31d8c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
@@ -189,6 +189,7 @@ public class Compiler {
     // because at that point we need access to the objects.
     Hive.get().getMSC().flushCache();
 
+    driverContext.setBackupContext(new Context(context));
     boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks();
 
     HiveSemanticAnalyzerHookContext hookCtx = new 
HiveSemanticAnalyzerHookContextImpl();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 4d2d869..c4517fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -47,6 +48,7 @@ import 
org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
 import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
@@ -72,8 +74,7 @@ public class Driver implements IDriver {
   private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private static final LogHelper CONSOLE = new LogHelper(LOG);
 
-  // Exception message that ReExecutionRetryLockPlugin will recognize
-  public static final String SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED =
+  private static final String SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED =
       "snapshot was outdated when locks were acquired";
 
   private int maxRows = 100;
@@ -137,13 +138,13 @@ public class Driver implements IDriver {
   public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager 
txnManager) {
     driverContext = new DriverContext(queryState, queryInfo, new 
HookRunner(queryState.getConf(), CONSOLE),
         txnManager);
-    driverTxnHandler = new DriverTxnHandler(this, driverContext, driverState);
+    driverTxnHandler = new DriverTxnHandler(driverContext, driverState);
   }
 
   /**
    * Compiles a new HQL command, but potentially resets taskID counter. Not 
resetting task counter is useful for
    * generating re-entrant QL queries.
-   * 
+   *
    * @param command  The HiveQL query to compile
    * @param resetTaskIds Resets taskID counter if true.
    * @return 0 for ok
@@ -159,7 +160,7 @@ public class Driver implements IDriver {
 
   /**
    * Compiles an HQL command, creates an execution plan for it.
-   * 
+   *
    * @param deferClose indicates if the close/destroy should be deferred when 
the process has been interrupted, it
    *        should be set to true if the compile is called within another 
method like runInternal, which defers the
    *        close to the called in that method.
@@ -490,7 +491,63 @@ public class Driver implements IDriver {
       DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring 
the lock.", null, null);
 
       lockAndRespond();
-      driverTxnHandler.validateTxnListState();
+
+      int retryShapshotCnt = 0;
+      int maxRetrySnapshotCnt = HiveConf.getIntVar(driverContext.getConf(),
+        HiveConf.ConfVars.HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT);
+
+      try {
+        while (!driverTxnHandler.isValidTxnListState() && ++retryShapshotCnt 
<= maxRetrySnapshotCnt) {
+          LOG.info("Re-compiling after acquiring locks, attempt #" + 
retryShapshotCnt);
+          // Snapshot was outdated when locks were acquired, hence regenerate 
context, txn list and retry.
+          // TODO: Lock acquisition should be moved before analyze, this is a 
bit hackish.
+          // Currently, we acquire a snapshot, compile the query with that 
snapshot, and then - acquire locks.
+          // If snapshot is still valid, we continue as usual.
+          // But if snapshot is not valid, we recompile the query.
+          if (driverContext.isOutdatedTxn()) {
+            LOG.info("Snapshot is outdated, re-initiating transaction ...");
+            driverContext.getTxnManager().rollbackTxn();
+
+            String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
+            driverContext.getTxnManager().openTxn(context, userFromUGI, 
driverContext.getTxnType());
+            lockAndRespond();
+          }
+
+          driverContext.setRetrial(true);
+          driverContext.getBackupContext().addSubContext(context);
+          
driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
+          context = driverContext.getBackupContext();
+
+          driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
+            driverContext.getTxnManager().getValidTxns().toString());
+
+          if (driverContext.getPlan().hasAcidResourcesInQuery()) {
+            compileInternal(context.getCmd(), true);
+            driverTxnHandler.recordValidWriteIds();
+            driverTxnHandler.setWriteIdForAcidFileSinks();
+          }
+          // Since we're reusing the compiled plan, we need to update its 
start time for current run
+          
driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime());
+        }
+
+        if (retryShapshotCnt > maxRetrySnapshotCnt) {
+          // Throw exception
+          HiveException e = new HiveException(
+              "Operation could not be executed, " + 
SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED + ".");
+          DriverUtils.handleHiveException(driverContext, e, 14, null);
+
+        } else if (retryShapshotCnt != 0) {
+          //Reset the PerfLogger
+          perfLogger = SessionState.getPerfLogger(true);
+
+          // the reason that we set the txn manager for the cxt here is 
because each
+          // query has its own ctx object. The txn mgr is shared across the
+          // same instance of Driver, which can run multiple queries.
+          context.setHiveTxnManager(driverContext.getTxnManager());
+        }
+      } catch (LockException | SemanticException e) {
+        DriverUtils.handleHiveException(driverContext, e, 13, null);
+      }
 
       try {
         taskQueue = new TaskQueue(context); // for canceling the query (should 
be bound to session?)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index ce551cc..0afa657 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -59,6 +59,7 @@ public class DriverContext {
   // either initTxnMgr or from the SessionState, in that order.
   private HiveTxnManager txnManager;
   private TxnType txnType = TxnType.DEFAULT;
+  private boolean outdatedTxn;
   private StatsSource statsSource;
 
   // Boolean to store information about whether valid txn list was generated
@@ -70,6 +71,7 @@ public class DriverContext {
   private ValidWriteIdList compactionWriteIds = null;
   private long compactorTxnId = 0;
 
+  private Context backupContext = null;
   private boolean retrial = false;
 
   private DataInput resStream;
@@ -154,6 +156,14 @@ public class DriverContext {
     return txnType;
   }
 
+  public void setOutdatedTxn(boolean outdated) {
+    this.outdatedTxn = outdated;
+  }
+
+  public boolean isOutdatedTxn() {
+    return outdatedTxn;
+  }
+
   public void setTxnType(TxnType txnType) {
     this.txnType = txnType;
   }
@@ -206,6 +216,14 @@ public class DriverContext {
     this.compactorTxnId = compactorTxnId;
   }
 
+  public Context getBackupContext() {
+    return backupContext;
+  }
+
+  public void setBackupContext(Context backupContext) {
+    this.backupContext = backupContext;
+  }
+
   public boolean isRetrial() {
     return retrial;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
index f2be56a..8817e42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin;
 import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
-import org.apache.hadoop.hive.ql.reexec.ReExecutionRetryLockPlugin;
 import org.apache.hadoop.hive.ql.reexec.ReExecuteLostAMQueryPlugin;
 import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin;
 import org.apache.hadoop.hive.ql.reexec.ReExecutionDagSubmitPlugin;
@@ -60,8 +59,6 @@ public final class DriverFactory {
       }
       plugins.add(buildReExecPlugin(string));
     }
-    // The retrylock plugin is always enabled
-    plugins.add(new ReExecutionRetryLockPlugin());
 
     return new ReExecDriver(queryState, queryInfo, plugins);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
index 00fb75d..a5e13c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
@@ -94,7 +94,7 @@ class DriverTxnHandler {
   private Context context;
   private Runnable txnRollbackRunner;
 
-  DriverTxnHandler(Driver driver, DriverContext driverContext, DriverState 
driverState) {
+  DriverTxnHandler(DriverContext driverContext, DriverState driverState) {
     this.driverContext = driverContext;
     this.driverState = driverState;
   }
@@ -249,7 +249,7 @@ class DriverTxnHandler {
     }
   }
 
-  private void setWriteIdForAcidFileSinks() throws SemanticException, 
LockException {
+  void setWriteIdForAcidFileSinks() throws SemanticException, LockException {
     if (!driverContext.getPlan().getAcidSinks().isEmpty()) {
       List<FileSinkDesc> acidSinks = new 
ArrayList<>(driverContext.getPlan().getAcidSinks());
       //sorting makes tests easier to write since file names and ROW__IDs 
depend on statementId
@@ -318,7 +318,7 @@ class DriverTxnHandler {
    *  Write the current set of valid write ids for the operated acid tables 
into the configuration so
    *  that it can be read by the input format.
    */
-  private ValidTxnWriteIdList recordValidWriteIds() throws LockException {
+  ValidTxnWriteIdList recordValidWriteIds() throws LockException {
     String txnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
     if (Strings.isNullOrEmpty(txnString)) {
       throw new IllegalStateException("calling recordValidWritsIdss() without 
initializing ValidTxnList " +
@@ -380,32 +380,12 @@ class DriverTxnHandler {
     }
   }
 
-  void validateTxnListState() throws CommandProcessorException {
-    try {
-      if (!isValidTxnListState()) {
-        LOG.warn("Reexecuting after acquiring locks, since snapshot was 
outdated.");
-        // Snapshot was outdated when locks were acquired, hence regenerate 
context,
-        // txn list and retry (see ReExecutionRetryLockPlugin)
-        try {
-          endTransactionAndCleanup(false);
-        } catch (LockException e) {
-          DriverUtils.handleHiveException(driverContext, e, 12, null);
-        }
-        HiveException e = new HiveException(
-            "Operation could not be executed, " + 
Driver.SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED + ".");
-        DriverUtils.handleHiveException(driverContext, e, 14, null);
-      }
-    } catch (LockException e) {
-      DriverUtils.handleHiveException(driverContext, e, 13, null);
-    }
-  }
-  
   /**
    * Checks whether txn list has been invalidated while planning the query.
    * This would happen if query requires exclusive/semi-shared lock, and there 
has been a committed transaction
    * on the table over which the lock is required.
    */
-  private boolean isValidTxnListState() throws LockException {
+  boolean isValidTxnListState() throws LockException {
     // 1) Get valid txn list.
     String txnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
     if (txnString == null) {
@@ -493,6 +473,7 @@ class DriverTxnHandler {
           // Check if there was a conflicting write between current SNAPSHOT 
generation and locking.
           if 
(currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1,
               currentWriteIdList.getHighWatermark()) != 
ValidWriteIdList.RangeResponse.NONE) {
+            driverContext.setOutdatedTxn(true);
             return false;
           }
           // Check that write id is still valid
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index effbcf6..abadb43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -505,7 +505,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         // For transaction started internally by repl load command, heartbeat 
needs to be stopped.
         clearLocksAndHB();
       }
-      getMS().replCommitTxn(rqst);
+      getMS().commitTxn(rqst);
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + 
JavaUtils.txnIdToString(rqst.getTxnid()));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, 
JavaUtils.txnIdToString(rqst.getTxnid()));
@@ -533,7 +533,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       // do all new clear in clearLocksAndHB method to make sure that same 
code is there for replCommitTxn flow.
       clearLocksAndHB();
       LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId));
-      getMS().commitTxn(txnId);
+      CommitTxnRequest commitTxnRequest = new CommitTxnRequest(txnId);
+      
commitTxnRequest.setExclWriteEnabled(conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+      getMS().commitTxn(commitTxnRequest);
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, 
JavaUtils.txnIdToString(txnId));
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
index c6d848c..be62fc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
@@ -22,7 +22,6 @@ import 
org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 
 /**
  * Defines an interface for re-execution logics.
@@ -48,7 +47,7 @@ public interface IReExecutionPlugin {
   /**
    * The query have failed, does this plugin advises to re-execute it again?
    */
-  boolean shouldReExecute(int executionNum, CommandProcessorException ex);
+  boolean shouldReExecute(int executionNum);
 
   /**
    * The plugin should prepare for the re-compilaton of the query.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
index 42dac46..c307085 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -57,10 +57,6 @@ import com.google.common.annotations.VisibleForTesting;
  */
 public class ReExecDriver implements IDriver {
 
-  // Every plugin should check for execution limit in shouldReexecute
-  // But just in case, we don't want an infinite loop
-  private final static int MAX_EXECUTION = 10;
-
   private class HandleReOptimizationExplain implements 
HiveSemanticAnalyzerHook {
 
     @Override
@@ -152,6 +148,8 @@ public class ReExecDriver implements IDriver {
   @Override
   public CommandProcessorResponse run() throws CommandProcessorException {
     executionIndex = 0;
+    int maxExecutuions = 1 + 
coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
+
 
     while (true) {
       executionIndex++;
@@ -172,9 +170,9 @@ public class ReExecDriver implements IDriver {
       afterExecute(oldPlanMapper, cpr != null);
 
       boolean shouldReExecute = explainReOptimization && executionIndex==1;
-      shouldReExecute |= cpr == null && shouldReExecute(cpe);
+      shouldReExecute |= cpr == null && shouldReExecute();
 
-      if (executionIndex >= MAX_EXECUTION || !shouldReExecute) {
+      if (executionIndex >= maxExecutuions || !shouldReExecute) {
         if (cpr != null) {
           return cpr;
         } else {
@@ -216,10 +214,10 @@ public class ReExecDriver implements IDriver {
     return ret;
   }
 
-  private boolean shouldReExecute(CommandProcessorException ex) {
+  private boolean shouldReExecute() {
     boolean ret = false;
     for (IReExecutionPlugin p : plugins) {
-      boolean shouldReExecute = p.shouldReExecute(executionIndex, ex);
+      boolean shouldReExecute = p.shouldReExecute(executionIndex);
       LOG.debug("{}.shouldReExecute = {}", p, shouldReExecute);
       ret |= shouldReExecute;
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java
index 6ada8f3..e779ad2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java
@@ -18,12 +18,10 @@
 
 package org.apache.hadoop.hive.ql.reexec;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +33,6 @@ import java.util.regex.Pattern;
 public class ReExecuteLostAMQueryPlugin implements IReExecutionPlugin {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReExecuteLostAMQueryPlugin.class);
   private boolean retryPossible;
-  private int maxExecutions = 1;
 
   // Lost am container have exit code -100, due to node failures. This pattern 
of exception is thrown when AM is managed
   // by HS2.
@@ -64,7 +61,6 @@ public class ReExecuteLostAMQueryPlugin implements 
IReExecutionPlugin {
   @Override
   public void initialize(Driver driver) {
     driver.getHookRunner().addOnFailureHook(new LocalHook());
-    maxExecutions = 1 + 
driver.getConf().getIntVar(HiveConf.ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
   }
 
   @Override
@@ -72,8 +68,8 @@ public class ReExecuteLostAMQueryPlugin implements 
IReExecutionPlugin {
   }
 
   @Override
-  public boolean shouldReExecute(int executionNum, CommandProcessorException 
ex) {
-    return (executionNum < maxExecutions) && retryPossible;
+  public boolean shouldReExecute(int executionNum) {
+    return retryPossible;
   }
 
   @Override
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java
index b6e76da..b019a00 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java
@@ -18,13 +18,11 @@
 
 package org.apache.hadoop.hive.ql.reexec;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory;
 public class ReExecutionDagSubmitPlugin implements IReExecutionPlugin {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReExecutionDagSubmitPlugin.class);
-  private int maxExecutions = 1;
+
   class LocalHook implements ExecuteWithHookContext {
 
     @Override
@@ -57,7 +55,6 @@ public class ReExecutionDagSubmitPlugin implements 
IReExecutionPlugin {
   @Override
   public void initialize(Driver driver) {
     driver.getHookRunner().addOnFailureHook(new LocalHook());
-    maxExecutions = 1 + 
driver.getConf().getIntVar(HiveConf.ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
   }
 
   private boolean retryPossible;
@@ -68,7 +65,7 @@ public class ReExecutionDagSubmitPlugin implements 
IReExecutionPlugin {
 
   @Override
   public boolean shouldReExecute(int executionNum, PlanMapper pm1, PlanMapper 
pm2) {
-    return (executionNum < maxExecutions) && retryPossible;
+    return retryPossible;
   }
 
   @Override
@@ -76,8 +73,8 @@ public class ReExecutionDagSubmitPlugin implements 
IReExecutionPlugin {
   }
 
   @Override
-  public boolean shouldReExecute(int executionNum, CommandProcessorException 
ex) {
-    return (executionNum < maxExecutions) && retryPossible;
+  public boolean shouldReExecute(int executionNum) {
+    return retryPossible;
   }
 
   @Override
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
index 84b5960..83df334 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.tez.dag.api.TezConfiguration;
 
 /**
@@ -75,8 +74,7 @@ public class ReExecutionOverlayPlugin implements 
IReExecutionPlugin {
   }
 
   @Override
-  public boolean shouldReExecute(int executionNum, CommandProcessorException 
ex) {
-
+  public boolean shouldReExecute(int executionNum) {
     return executionNum == 1 && !subtree.isEmpty() && retryPossible;
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionRetryLockPlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionRetryLockPlugin.java
deleted file mode 100644
index 6366c4e..0000000
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionRetryLockPlugin.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.reexec;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
-
-public class ReExecutionRetryLockPlugin implements IReExecutionPlugin {
-
-  private Driver coreDriver;
-  private int maxRetryLockExecutions = 1;
-
-  @Override
-  public void initialize(Driver driver) {
-    coreDriver = driver;
-    maxRetryLockExecutions = 1 + 
coreDriver.getConf().getIntVar(HiveConf.ConfVars.HIVE_QUERY_MAX_REEXECUTION_RETRYLOCK_COUNT);
-  }
-
-  @Override
-  public void beforeExecute(int executionIndex, boolean explainReOptimization) 
{
-  }
-
-  @Override
-  public boolean shouldReExecute(int executionNum, CommandProcessorException 
ex) {
-    return executionNum < maxRetryLockExecutions && ex != null &&
-        
ex.getMessage().contains(Driver.SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED);
-  }
-
-  @Override
-  public void prepareToReExecute() {
-  }
-
-  @Override
-  public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, 
PlanMapper newPlanMapper) {
-    return executionNum < maxRetryLockExecutions;
-  }
-
-  @Override
-  public void afterExecute(PlanMapper planMapper, boolean successfull) {
-  }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
index ecef303..09045af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
 import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
 import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +49,6 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
 
   private boolean alwaysCollectStats;
 
-  private int maxExecutions = 1;
-
   class LocalHook implements ExecuteWithHookContext {
 
     @Override
@@ -83,15 +80,14 @@ public class ReOptimizePlugin implements IReExecutionPlugin 
{
     coreDriver.getHookRunner().addOnFailureHook(statsReaderHook);
     coreDriver.getHookRunner().addPostHook(statsReaderHook);
     alwaysCollectStats = 
driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS);
-    maxExecutions = 1 + 
coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
     statsReaderHook.setCollectOnSuccess(alwaysCollectStats);
 
     coreDriver.setStatsSource(StatsSources.getStatsSource(driver.getConf()));
   }
 
   @Override
-  public boolean shouldReExecute(int executionNum, CommandProcessorException 
ex) {
-    return (executionNum < maxExecutions) && retryPossible;
+  public boolean shouldReExecute(int executionNum) {
+    return retryPossible;
   }
 
   @Override
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index e1185de..1725a14 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -275,7 +275,8 @@ public class TestTxnCommands extends 
TxnCommandsBaseForTests {
       } catch (HiveException e) {
         throw new RuntimeException(e);
       }
-      try (IDriver d = DriverFactory.newDriver(hiveConf)) {
+      QueryState qs = new 
QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build();
+      try (Driver d = new Driver(qs)) {
         LOG.info("Ready to run the query: " + query);
         syncThreadStart(cdlIn, cdlOut);
         try {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index de3d296..ba50a2b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -60,6 +60,8 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests 
{
   @Test
   public void testRenameTable() throws Exception {
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
+
     runStatementOnDriver("drop database if exists mydb1 cascade");
     runStatementOnDriver("drop database if exists mydb2 cascade");
     runStatementOnDriver("create database mydb1");
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index 651969b..b256415 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.After;
@@ -41,8 +39,7 @@ public abstract class DbTxnManagerEndToEndTestBase {
   protected static HiveConf conf = new HiveConf(Driver.class);
   protected HiveTxnManager txnMgr;
   protected Context ctx;
-  protected Driver driver;
-  protected IDriver driver2;
+  protected Driver driver, driver2;
   protected TxnStore txnHandler;
 
   public DbTxnManagerEndToEndTestBase() {
@@ -66,7 +63,7 @@ public abstract class DbTxnManagerEndToEndTestBase {
     SessionState.start(conf);
     ctx = new Context(conf);
     driver = new Driver(new 
QueryState.Builder().withHiveConf(conf).nonIsolated().build());
-    driver2 = DriverFactory.newDriver(conf);
+    driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
     conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
     TxnDbUtil.cleanDb(conf);
     SessionState ss = SessionState.get();
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index b18f98b..235d756 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -34,10 +34,9 @@ import 
org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.TestTxnCommands2;
-import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
 import org.junit.Assert;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -2007,8 +2006,8 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     catch (LockException e) {
       expectedException = e;
     }
-    if (causeConflict) {
-      Assert.assertNotNull("didn't get exception", expectedException);
+    if (causeConflict && sharedWrite) {
+      Assert.assertNotNull("Didn't get exception", expectedException);
       try {
         Assert.assertEquals("Transaction manager has aborted the transaction 
txnid:11.  Reason: " +
             "Aborting [txnid:11,11] due to a write conflict on 
default/target/p=1/q=2 " +
@@ -2041,7 +2040,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
       Assert.assertEquals(
           "COMPLETED_TXN_COMPONENTS mismatch(" + 
JavaUtils.txnIdToString(txnId2) + "): " +
           TxnDbUtil.queryToString(conf, "select * from 
\"COMPLETED_TXN_COMPONENTS\""),
-          4,
+        causeConflict ? 6 : 4,
           TxnDbUtil.countQueryAgent(conf, "select count(*) from 
\"COMPLETED_TXN_COMPONENTS\" where \"CTC_TXNID\"=" + txnId2));
       Assert.assertEquals(
           "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
@@ -2052,7 +2051,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
       Assert.assertEquals(
           "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
           TxnDbUtil.queryToString(conf, "select * from \"WRITE_SET\""),
-          0,
+        causeConflict ? 2 : 0,
           TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\" 
where \"WS_TXNID\"=" + txnId2 +
               " and \"WS_OPERATION_TYPE\"='d'"));
     }
@@ -2167,7 +2166,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     catch (LockException e) {
       expectedException = e;
     }
-    if (causeConflict) {
+    if (causeConflict && sharedWrite) {
       Assert.assertNotNull("Didn't get exception", expectedException);
       Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, 
expectedException.getCanonicalErrorMsg());
       Assert.assertEquals("Exception msg didn't match",
@@ -2258,7 +2257,6 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
 
     swapTxnManager(txnMgr);
     driver.run(query);
-    driver.run("select * from target");
 
     swapTxnManager(txnMgr2);
     try {
@@ -2276,6 +2274,68 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
   }
 
   @Test
+  public void testUpdateMergeUpdateConcurrentSnapshotInvalidate() throws 
Exception {
+    dropTable(new String[]{"target", "source"});
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
+
+    driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES 
('transactional'='true')");
+    driver.run("insert into target values (1,2), (3,4)");
+    driver.run("create table source (a int, b int)");
+    driver.run("insert into source values (5,6), (7,8)");
+
+    driver.compileAndRespond("update target set a=5 where a=1");
+
+    DbTxnManager txnMgr2 = (DbTxnManager) 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    driver2.compileAndRespond("merge into target t using source s on t.a = s.a 
" +
+      "when matched then update set b=8");
+
+    swapTxnManager(txnMgr);
+    driver.run();
+
+    swapTxnManager(txnMgr2);
+    driver2.run();
+
+    swapTxnManager(txnMgr);
+    driver.run("select * from target");
+    List res = new ArrayList();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(2, res.size());
+    Assert.assertEquals("Lost Update", "5\t8", res.get(1));
+    dropTable(new String[]{"target", "source"});
+  }
+
+  @Test
+  public void testUpdateMergeUpdateConcurrentSnapshotInvalidateNewTxn() throws 
Exception {
+    dropTable(new String[]{"target", "source"});
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
+
+    driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES 
('transactional'='true')");
+    driver.run("insert into target values (1,2), (3,4)");
+    driver.run("create table source (a int, b int)");
+    driver.run("insert into source values (5,6), (7,8)");
+
+    DbTxnManager txnMgr2 = (DbTxnManager) 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    driver2.compileAndRespond("merge into target t using source s on t.a = s.a 
" +
+      "when matched then update set b=8");
+
+    swapTxnManager(txnMgr);
+    driver.run("update target set a=5 where a=1");
+
+    swapTxnManager(txnMgr2);
+    driver2.run();
+
+    swapTxnManager(txnMgr);
+    driver.run("select * from target");
+    List res = new ArrayList();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(2, res.size());
+    Assert.assertEquals("Lost Update", "5\t8", res.get(1));
+    dropTable(new String[]{"target", "source"});
+  }
+
+  @Test
   public void test2MergeInsertsConcurrentNoDuplicates() throws Exception {
     testConcurrentMergeInsertNoDuplicates("merge into target t using source s 
on t.a = s.a " +
         "when not matched then insert values (s.a, s.b)", false);
@@ -2335,7 +2395,6 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
    */
   @Test
   public void testMergeInsertDynamicPartitioningSequential() throws Exception {
-
     dropTable(new String[]{"target", "source"});
     conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
 
@@ -2354,7 +2413,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
 
     // txn3 merge
     driver.run("merge into target t using source s on t.a = s.a " +
-        "when not matched then insert values (s.a, s.b, s.c)");
+      "when not matched then insert values (s.a, s.b, s.c)");
     driver.run("select * from target");
     List res = new ArrayList();
     driver.getFetchTask().fetch(res);
@@ -2366,10 +2425,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
 
   @Test
   public void 
testMergeInsertDynamicPartitioningSnapshotInvalidatedWithOldCommit() throws 
Exception {
-
-    // By creating the driver with the factory, we should have a ReExecDriver
-    IDriver driver3 = DriverFactory.newDriver(conf);
-    Assert.assertTrue("ReExecDriver was expected", driver3 instanceof 
ReExecDriver);
+    Driver driver3 = new Driver(new 
QueryState.Builder().withHiveConf(conf).build());
 
     dropTable(new String[]{"target", "source"});
     conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
@@ -2395,7 +2451,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
 
     // Compile txn 3 with only 1 known partition
     driver3.compileAndRespond("merge into target t using source s on t.a = s.a 
" +
-        "when not matched then insert values (s.a, s.b, s.c)");
+      "when not matched then insert values (s.a, s.b, s.c)");
 
     swapTxnManager(txnMgr);
     driver.run();
@@ -2414,15 +2470,13 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     // The merge should see all three partition and not create duplicates
     Assert.assertEquals("Duplicate records found", 6, res.size());
     Assert.assertTrue("Partition 3 was skipped", res.contains("6\t6\t3"));
+    driver3.close();
     dropTable(new String[]{"target", "source"});
   }
 
-
   @Test
   public void 
testMergeInsertDynamicPartitioningSnapshotInvalidatedWithNewCommit() throws 
Exception {
-    // By creating the driver with the factory, we should have a ReExecDriver
-    IDriver driver3 = DriverFactory.newDriver(conf);
-    Assert.assertTrue("ReExecDriver was expected", driver3 instanceof 
ReExecDriver);
+    Driver driver3 = new Driver(new 
QueryState.Builder().withHiveConf(conf).build());
 
     dropTable(new String[]{"target", "source"});
     conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
@@ -2438,7 +2492,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     swapTxnManager(txnMgr3);
     // Compile txn 1 merge with only 1 known partition
     driver3.compileAndRespond("merge into target t using source s on t.a = s.a 
" +
-        "when not matched then insert values (s.a, s.b, s.c)");
+      "when not matched then insert values (s.a, s.b, s.c)");
 
     swapTxnManager(txnMgr);
     // txn 2 insert data to an old and a new partition
@@ -2451,7 +2505,6 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     swapTxnManager(txnMgr3);
     driver3.run();
 
-
     swapTxnManager(txnMgr);
     driver.run("select * from target");
     List res = new ArrayList();
@@ -2459,6 +2512,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     // The merge should see all three partition and not create duplicates
     Assert.assertEquals("Duplicate records found", 6, res.size());
     Assert.assertTrue("Partition 3 was skipped", res.contains("6\t6\t3"));
+    driver3.close();
     dropTable(new String[]{"target", "source"});
   }
 
@@ -2691,7 +2745,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
     catch (LockException e) {
       expectedException = e;
     }
-    if (causeConflict) {
+    if (causeConflict && sharedWrite) {
       Assert.assertNotNull("Didn't get exception", expectedException);
       Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, 
expectedException.getCanonicalErrorMsg());
       Assert.assertEquals("Exception msg didn't match",
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 1341c65..68ae4cd 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -20642,6 +20642,11 @@ void CommitTxnRequest::__set_replLastIdInfo(const 
ReplLastIdInfo& val) {
 __isset.replLastIdInfo = true;
 }
 
+void CommitTxnRequest::__set_exclWriteEnabled(const bool val) {
+  this->exclWriteEnabled = val;
+__isset.exclWriteEnabled = true;
+}
+
 void CommitTxnRequest::__set_keyValue(const CommitTxnKeyValue& val) {
   this->keyValue = val;
 __isset.keyValue = true;
@@ -20720,6 +20725,14 @@ uint32_t 
CommitTxnRequest::read(::apache::thrift::protocol::TProtocol* iprot) {
         }
         break;
       case 5:
+        if (ftype == ::apache::thrift::protocol::T_BOOL) {
+          xfer += iprot->readBool(this->exclWriteEnabled);
+          this->__isset.exclWriteEnabled = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 6:
         if (ftype == ::apache::thrift::protocol::T_STRUCT) {
           xfer += this->keyValue.read(iprot);
           this->__isset.keyValue = true;
@@ -20773,8 +20786,13 @@ uint32_t 
CommitTxnRequest::write(::apache::thrift::protocol::TProtocol* oprot) c
     xfer += this->replLastIdInfo.write(oprot);
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.exclWriteEnabled) {
+    xfer += oprot->writeFieldBegin("exclWriteEnabled", 
::apache::thrift::protocol::T_BOOL, 5);
+    xfer += oprot->writeBool(this->exclWriteEnabled);
+    xfer += oprot->writeFieldEnd();
+  }
   if (this->__isset.keyValue) {
-    xfer += oprot->writeFieldBegin("keyValue", 
::apache::thrift::protocol::T_STRUCT, 5);
+    xfer += oprot->writeFieldBegin("keyValue", 
::apache::thrift::protocol::T_STRUCT, 6);
     xfer += this->keyValue.write(oprot);
     xfer += oprot->writeFieldEnd();
   }
@@ -20789,6 +20807,7 @@ void swap(CommitTxnRequest &a, CommitTxnRequest &b) {
   swap(a.replPolicy, b.replPolicy);
   swap(a.writeEventInfos, b.writeEventInfos);
   swap(a.replLastIdInfo, b.replLastIdInfo);
+  swap(a.exclWriteEnabled, b.exclWriteEnabled);
   swap(a.keyValue, b.keyValue);
   swap(a.__isset, b.__isset);
 }
@@ -20798,6 +20817,7 @@ CommitTxnRequest::CommitTxnRequest(const 
CommitTxnRequest& other759) {
   replPolicy = other759.replPolicy;
   writeEventInfos = other759.writeEventInfos;
   replLastIdInfo = other759.replLastIdInfo;
+  exclWriteEnabled = other759.exclWriteEnabled;
   keyValue = other759.keyValue;
   __isset = other759.__isset;
 }
@@ -20806,6 +20826,7 @@ CommitTxnRequest& CommitTxnRequest::operator=(const 
CommitTxnRequest& other760)
   replPolicy = other760.replPolicy;
   writeEventInfos = other760.writeEventInfos;
   replLastIdInfo = other760.replLastIdInfo;
+  exclWriteEnabled = other760.exclWriteEnabled;
   keyValue = other760.keyValue;
   __isset = other760.__isset;
   return *this;
@@ -20817,6 +20838,7 @@ void CommitTxnRequest::printTo(std::ostream& out) const 
{
   out << ", " << "replPolicy="; (__isset.replPolicy ? (out << 
to_string(replPolicy)) : (out << "<null>"));
   out << ", " << "writeEventInfos="; (__isset.writeEventInfos ? (out << 
to_string(writeEventInfos)) : (out << "<null>"));
   out << ", " << "replLastIdInfo="; (__isset.replLastIdInfo ? (out << 
to_string(replLastIdInfo)) : (out << "<null>"));
+  out << ", " << "exclWriteEnabled="; (__isset.exclWriteEnabled ? (out << 
to_string(exclWriteEnabled)) : (out << "<null>"));
   out << ", " << "keyValue="; (__isset.keyValue ? (out << to_string(keyValue)) 
: (out << "<null>"));
   out << ")";
 }
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index d805068..c1c186f 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -7853,10 +7853,11 @@ void swap(ReplLastIdInfo &a, ReplLastIdInfo &b);
 std::ostream& operator<<(std::ostream& out, const ReplLastIdInfo& obj);
 
 typedef struct _CommitTxnRequest__isset {
-  _CommitTxnRequest__isset() : replPolicy(false), writeEventInfos(false), 
replLastIdInfo(false), keyValue(false) {}
+  _CommitTxnRequest__isset() : replPolicy(false), writeEventInfos(false), 
replLastIdInfo(false), exclWriteEnabled(true), keyValue(false) {}
   bool replPolicy :1;
   bool writeEventInfos :1;
   bool replLastIdInfo :1;
+  bool exclWriteEnabled :1;
   bool keyValue :1;
 } _CommitTxnRequest__isset;
 
@@ -7865,7 +7866,7 @@ class CommitTxnRequest : public virtual 
::apache::thrift::TBase {
 
   CommitTxnRequest(const CommitTxnRequest&);
   CommitTxnRequest& operator=(const CommitTxnRequest&);
-  CommitTxnRequest() : txnid(0), replPolicy() {
+  CommitTxnRequest() : txnid(0), replPolicy(), exclWriteEnabled(true) {
   }
 
   virtual ~CommitTxnRequest() noexcept;
@@ -7873,6 +7874,7 @@ class CommitTxnRequest : public virtual 
::apache::thrift::TBase {
   std::string replPolicy;
   std::vector<WriteEventInfo>  writeEventInfos;
   ReplLastIdInfo replLastIdInfo;
+  bool exclWriteEnabled;
   CommitTxnKeyValue keyValue;
 
   _CommitTxnRequest__isset __isset;
@@ -7885,6 +7887,8 @@ class CommitTxnRequest : public virtual 
::apache::thrift::TBase {
 
   void __set_replLastIdInfo(const ReplLastIdInfo& val);
 
+  void __set_exclWriteEnabled(const bool val);
+
   void __set_keyValue(const CommitTxnKeyValue& val);
 
   bool operator == (const CommitTxnRequest & rhs) const
@@ -7903,6 +7907,10 @@ class CommitTxnRequest : public virtual 
::apache::thrift::TBase {
       return false;
     else if (__isset.replLastIdInfo && !(replLastIdInfo == rhs.replLastIdInfo))
       return false;
+    if (__isset.exclWriteEnabled != rhs.__isset.exclWriteEnabled)
+      return false;
+    else if (__isset.exclWriteEnabled && !(exclWriteEnabled == 
rhs.exclWriteEnabled))
+      return false;
     if (__isset.keyValue != rhs.__isset.keyValue)
       return false;
     else if (__isset.keyValue && !(keyValue == rhs.keyValue))
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java
index 9704544..1e7cd1d 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java
@@ -15,7 +15,8 @@ package org.apache.hadoop.hive.metastore.api;
   private static final org.apache.thrift.protocol.TField 
REPL_POLICY_FIELD_DESC = new org.apache.thrift.protocol.TField("replPolicy", 
org.apache.thrift.protocol.TType.STRING, (short)2);
   private static final org.apache.thrift.protocol.TField 
WRITE_EVENT_INFOS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("writeEventInfos", 
org.apache.thrift.protocol.TType.LIST, (short)3);
   private static final org.apache.thrift.protocol.TField 
REPL_LAST_ID_INFO_FIELD_DESC = new 
org.apache.thrift.protocol.TField("replLastIdInfo", 
org.apache.thrift.protocol.TType.STRUCT, (short)4);
-  private static final org.apache.thrift.protocol.TField KEY_VALUE_FIELD_DESC 
= new org.apache.thrift.protocol.TField("keyValue", 
org.apache.thrift.protocol.TType.STRUCT, (short)5);
+  private static final org.apache.thrift.protocol.TField 
EXCL_WRITE_ENABLED_FIELD_DESC = new 
org.apache.thrift.protocol.TField("exclWriteEnabled", 
org.apache.thrift.protocol.TType.BOOL, (short)5);
+  private static final org.apache.thrift.protocol.TField KEY_VALUE_FIELD_DESC 
= new org.apache.thrift.protocol.TField("keyValue", 
org.apache.thrift.protocol.TType.STRUCT, (short)6);
 
   private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new CommitTxnRequestStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new CommitTxnRequestTupleSchemeFactory();
@@ -24,6 +25,7 @@ package org.apache.hadoop.hive.metastore.api;
   private @org.apache.thrift.annotation.Nullable java.lang.String replPolicy; 
// optional
   private @org.apache.thrift.annotation.Nullable 
java.util.List<WriteEventInfo> writeEventInfos; // optional
   private @org.apache.thrift.annotation.Nullable ReplLastIdInfo 
replLastIdInfo; // optional
+  private boolean exclWriteEnabled; // optional
   private @org.apache.thrift.annotation.Nullable CommitTxnKeyValue keyValue; 
// optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
@@ -32,7 +34,8 @@ package org.apache.hadoop.hive.metastore.api;
     REPL_POLICY((short)2, "replPolicy"),
     WRITE_EVENT_INFOS((short)3, "writeEventInfos"),
     REPL_LAST_ID_INFO((short)4, "replLastIdInfo"),
-    KEY_VALUE((short)5, "keyValue");
+    EXCL_WRITE_ENABLED((short)5, "exclWriteEnabled"),
+    KEY_VALUE((short)6, "keyValue");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new 
java.util.HashMap<java.lang.String, _Fields>();
 
@@ -56,7 +59,9 @@ package org.apache.hadoop.hive.metastore.api;
           return WRITE_EVENT_INFOS;
         case 4: // REPL_LAST_ID_INFO
           return REPL_LAST_ID_INFO;
-        case 5: // KEY_VALUE
+        case 5: // EXCL_WRITE_ENABLED
+          return EXCL_WRITE_ENABLED;
+        case 6: // KEY_VALUE
           return KEY_VALUE;
         default:
           return null;
@@ -100,8 +105,9 @@ package org.apache.hadoop.hive.metastore.api;
 
   // isset id assignments
   private static final int __TXNID_ISSET_ID = 0;
+  private static final int __EXCLWRITEENABLED_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = 
{_Fields.REPL_POLICY,_Fields.WRITE_EVENT_INFOS,_Fields.REPL_LAST_ID_INFO,_Fields.KEY_VALUE};
+  private static final _Fields optionals[] = 
{_Fields.REPL_POLICY,_Fields.WRITE_EVENT_INFOS,_Fields.REPL_LAST_ID_INFO,_Fields.EXCL_WRITE_ENABLED,_Fields.KEY_VALUE};
   public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = 
new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -114,6 +120,8 @@ package org.apache.hadoop.hive.metastore.api;
             new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 WriteEventInfo.class))));
     tmpMap.put(_Fields.REPL_LAST_ID_INFO, new 
org.apache.thrift.meta_data.FieldMetaData("replLastIdInfo", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 ReplLastIdInfo.class)));
+    tmpMap.put(_Fields.EXCL_WRITE_ENABLED, new 
org.apache.thrift.meta_data.FieldMetaData("exclWriteEnabled", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     tmpMap.put(_Fields.KEY_VALUE, new 
org.apache.thrift.meta_data.FieldMetaData("keyValue", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 CommitTxnKeyValue.class)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
@@ -121,6 +129,8 @@ package org.apache.hadoop.hive.metastore.api;
   }
 
   public CommitTxnRequest() {
+    this.exclWriteEnabled = true;
+
   }
 
   public CommitTxnRequest(
@@ -150,6 +160,7 @@ package org.apache.hadoop.hive.metastore.api;
     if (other.isSetReplLastIdInfo()) {
       this.replLastIdInfo = new ReplLastIdInfo(other.replLastIdInfo);
     }
+    this.exclWriteEnabled = other.exclWriteEnabled;
     if (other.isSetKeyValue()) {
       this.keyValue = new CommitTxnKeyValue(other.keyValue);
     }
@@ -166,6 +177,8 @@ package org.apache.hadoop.hive.metastore.api;
     this.replPolicy = null;
     this.writeEventInfos = null;
     this.replLastIdInfo = null;
+    this.exclWriteEnabled = true;
+
     this.keyValue = null;
   }
 
@@ -279,6 +292,28 @@ package org.apache.hadoop.hive.metastore.api;
     }
   }
 
+  public boolean isExclWriteEnabled() {
+    return this.exclWriteEnabled;
+  }
+
+  public void setExclWriteEnabled(boolean exclWriteEnabled) {
+    this.exclWriteEnabled = exclWriteEnabled;
+    setExclWriteEnabledIsSet(true);
+  }
+
+  public void unsetExclWriteEnabled() {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, 
__EXCLWRITEENABLED_ISSET_ID);
+  }
+
+  /** Returns true if field exclWriteEnabled is set (has been assigned a 
value) and false otherwise */
+  public boolean isSetExclWriteEnabled() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, 
__EXCLWRITEENABLED_ISSET_ID);
+  }
+
+  public void setExclWriteEnabledIsSet(boolean value) {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, 
__EXCLWRITEENABLED_ISSET_ID, value);
+  }
+
   @org.apache.thrift.annotation.Nullable
   public CommitTxnKeyValue getKeyValue() {
     return this.keyValue;
@@ -337,6 +372,14 @@ package org.apache.hadoop.hive.metastore.api;
       }
       break;
 
+    case EXCL_WRITE_ENABLED:
+      if (value == null) {
+        unsetExclWriteEnabled();
+      } else {
+        setExclWriteEnabled((java.lang.Boolean)value);
+      }
+      break;
+
     case KEY_VALUE:
       if (value == null) {
         unsetKeyValue();
@@ -363,6 +406,9 @@ package org.apache.hadoop.hive.metastore.api;
     case REPL_LAST_ID_INFO:
       return getReplLastIdInfo();
 
+    case EXCL_WRITE_ENABLED:
+      return isExclWriteEnabled();
+
     case KEY_VALUE:
       return getKeyValue();
 
@@ -385,6 +431,8 @@ package org.apache.hadoop.hive.metastore.api;
       return isSetWriteEventInfos();
     case REPL_LAST_ID_INFO:
       return isSetReplLastIdInfo();
+    case EXCL_WRITE_ENABLED:
+      return isSetExclWriteEnabled();
     case KEY_VALUE:
       return isSetKeyValue();
     }
@@ -442,6 +490,15 @@ package org.apache.hadoop.hive.metastore.api;
         return false;
     }
 
+    boolean this_present_exclWriteEnabled = true && 
this.isSetExclWriteEnabled();
+    boolean that_present_exclWriteEnabled = true && 
that.isSetExclWriteEnabled();
+    if (this_present_exclWriteEnabled || that_present_exclWriteEnabled) {
+      if (!(this_present_exclWriteEnabled && that_present_exclWriteEnabled))
+        return false;
+      if (this.exclWriteEnabled != that.exclWriteEnabled)
+        return false;
+    }
+
     boolean this_present_keyValue = true && this.isSetKeyValue();
     boolean that_present_keyValue = true && that.isSetKeyValue();
     if (this_present_keyValue || that_present_keyValue) {
@@ -472,6 +529,10 @@ package org.apache.hadoop.hive.metastore.api;
     if (isSetReplLastIdInfo())
       hashCode = hashCode * 8191 + replLastIdInfo.hashCode();
 
+    hashCode = hashCode * 8191 + ((isSetExclWriteEnabled()) ? 131071 : 524287);
+    if (isSetExclWriteEnabled())
+      hashCode = hashCode * 8191 + ((exclWriteEnabled) ? 131071 : 524287);
+
     hashCode = hashCode * 8191 + ((isSetKeyValue()) ? 131071 : 524287);
     if (isSetKeyValue())
       hashCode = hashCode * 8191 + keyValue.hashCode();
@@ -527,6 +588,16 @@ package org.apache.hadoop.hive.metastore.api;
         return lastComparison;
       }
     }
+    lastComparison = 
java.lang.Boolean.valueOf(isSetExclWriteEnabled()).compareTo(other.isSetExclWriteEnabled());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetExclWriteEnabled()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.exclWriteEnabled, 
other.exclWriteEnabled);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = 
java.lang.Boolean.valueOf(isSetKeyValue()).compareTo(other.isSetKeyValue());
     if (lastComparison != 0) {
       return lastComparison;
@@ -591,6 +662,12 @@ package org.apache.hadoop.hive.metastore.api;
       }
       first = false;
     }
+    if (isSetExclWriteEnabled()) {
+      if (!first) sb.append(", ");
+      sb.append("exclWriteEnabled:");
+      sb.append(this.exclWriteEnabled);
+      first = false;
+    }
     if (isSetKeyValue()) {
       if (!first) sb.append(", ");
       sb.append("keyValue:");
@@ -700,7 +777,15 @@ package org.apache.hadoop.hive.metastore.api;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
-          case 5: // KEY_VALUE
+          case 5: // EXCL_WRITE_ENABLED
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.exclWriteEnabled = iprot.readBool();
+              struct.setExclWriteEnabledIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 6: // KEY_VALUE
             if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
               struct.keyValue = new CommitTxnKeyValue();
               struct.keyValue.read(iprot);
@@ -753,6 +838,11 @@ package org.apache.hadoop.hive.metastore.api;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.isSetExclWriteEnabled()) {
+        oprot.writeFieldBegin(EXCL_WRITE_ENABLED_FIELD_DESC);
+        oprot.writeBool(struct.exclWriteEnabled);
+        oprot.writeFieldEnd();
+      }
       if (struct.keyValue != null) {
         if (struct.isSetKeyValue()) {
           oprot.writeFieldBegin(KEY_VALUE_FIELD_DESC);
@@ -788,10 +878,13 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetReplLastIdInfo()) {
         optionals.set(2);
       }
-      if (struct.isSetKeyValue()) {
+      if (struct.isSetExclWriteEnabled()) {
         optionals.set(3);
       }
-      oprot.writeBitSet(optionals, 4);
+      if (struct.isSetKeyValue()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
       if (struct.isSetReplPolicy()) {
         oprot.writeString(struct.replPolicy);
       }
@@ -807,6 +900,9 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetReplLastIdInfo()) {
         struct.replLastIdInfo.write(oprot);
       }
+      if (struct.isSetExclWriteEnabled()) {
+        oprot.writeBool(struct.exclWriteEnabled);
+      }
       if (struct.isSetKeyValue()) {
         struct.keyValue.write(oprot);
       }
@@ -817,7 +913,7 @@ package org.apache.hadoop.hive.metastore.api;
       org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
       struct.txnid = iprot.readI64();
       struct.setTxnidIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(4);
+      java.util.BitSet incoming = iprot.readBitSet(5);
       if (incoming.get(0)) {
         struct.replPolicy = iprot.readString();
         struct.setReplPolicyIsSet(true);
@@ -842,6 +938,10 @@ package org.apache.hadoop.hive.metastore.api;
         struct.setReplLastIdInfoIsSet(true);
       }
       if (incoming.get(3)) {
+        struct.exclWriteEnabled = iprot.readBool();
+        struct.setExclWriteEnabledIsSet(true);
+      }
+      if (incoming.get(4)) {
         struct.keyValue = new CommitTxnKeyValue();
         struct.keyValue.read(iprot);
         struct.setKeyValueIsSet(true);
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php
index 94cdc6a..287376f 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php
@@ -48,6 +48,11 @@ class CommitTxnRequest
             'class' => '\metastore\ReplLastIdInfo',
         ),
         5 => array(
+            'var' => 'exclWriteEnabled',
+            'isRequired' => false,
+            'type' => TType::BOOL,
+        ),
+        6 => array(
             'var' => 'keyValue',
             'isRequired' => false,
             'type' => TType::STRUCT,
@@ -72,6 +77,10 @@ class CommitTxnRequest
      */
     public $replLastIdInfo = null;
     /**
+     * @var bool
+     */
+    public $exclWriteEnabled = true;
+    /**
      * @var \metastore\CommitTxnKeyValue
      */
     public $keyValue = null;
@@ -91,6 +100,9 @@ class CommitTxnRequest
             if (isset($vals['replLastIdInfo'])) {
                 $this->replLastIdInfo = $vals['replLastIdInfo'];
             }
+            if (isset($vals['exclWriteEnabled'])) {
+                $this->exclWriteEnabled = $vals['exclWriteEnabled'];
+            }
             if (isset($vals['keyValue'])) {
                 $this->keyValue = $vals['keyValue'];
             }
@@ -156,6 +168,13 @@ class CommitTxnRequest
                     }
                     break;
                 case 5:
+                    if ($ftype == TType::BOOL) {
+                        $xfer += $input->readBool($this->exclWriteEnabled);
+                    } else {
+                        $xfer += $input->skip($ftype);
+                    }
+                    break;
+                case 6:
                     if ($ftype == TType::STRUCT) {
                         $this->keyValue = new \metastore\CommitTxnKeyValue();
                         $xfer += $this->keyValue->read($input);
@@ -207,11 +226,16 @@ class CommitTxnRequest
             $xfer += $this->replLastIdInfo->write($output);
             $xfer += $output->writeFieldEnd();
         }
+        if ($this->exclWriteEnabled !== null) {
+            $xfer += $output->writeFieldBegin('exclWriteEnabled', TType::BOOL, 
5);
+            $xfer += $output->writeBool($this->exclWriteEnabled);
+            $xfer += $output->writeFieldEnd();
+        }
         if ($this->keyValue !== null) {
             if (!is_object($this->keyValue)) {
                 throw new TProtocolException('Bad type in structure.', 
TProtocolException::INVALID_DATA);
             }
-            $xfer += $output->writeFieldBegin('keyValue', TType::STRUCT, 5);
+            $xfer += $output->writeFieldBegin('keyValue', TType::STRUCT, 6);
             $xfer += $this->keyValue->write($output);
             $xfer += $output->writeFieldEnd();
         }
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 52e88cd..ee8ce4b 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11869,16 +11869,18 @@ class CommitTxnRequest(object):
      - replPolicy
      - writeEventInfos
      - replLastIdInfo
+     - exclWriteEnabled
      - keyValue
 
     """
 
 
-    def __init__(self, txnid=None, replPolicy=None, writeEventInfos=None, 
replLastIdInfo=None, keyValue=None,):
+    def __init__(self, txnid=None, replPolicy=None, writeEventInfos=None, 
replLastIdInfo=None, exclWriteEnabled=True, keyValue=None,):
         self.txnid = txnid
         self.replPolicy = replPolicy
         self.writeEventInfos = writeEventInfos
         self.replLastIdInfo = replLastIdInfo
+        self.exclWriteEnabled = exclWriteEnabled
         self.keyValue = keyValue
 
     def read(self, iprot):
@@ -11918,6 +11920,11 @@ class CommitTxnRequest(object):
                 else:
                     iprot.skip(ftype)
             elif fid == 5:
+                if ftype == TType.BOOL:
+                    self.exclWriteEnabled = iprot.readBool()
+                else:
+                    iprot.skip(ftype)
+            elif fid == 6:
                 if ftype == TType.STRUCT:
                     self.keyValue = CommitTxnKeyValue()
                     self.keyValue.read(iprot)
@@ -11952,8 +11959,12 @@ class CommitTxnRequest(object):
             oprot.writeFieldBegin('replLastIdInfo', TType.STRUCT, 4)
             self.replLastIdInfo.write(oprot)
             oprot.writeFieldEnd()
+        if self.exclWriteEnabled is not None:
+            oprot.writeFieldBegin('exclWriteEnabled', TType.BOOL, 5)
+            oprot.writeBool(self.exclWriteEnabled)
+            oprot.writeFieldEnd()
         if self.keyValue is not None:
-            oprot.writeFieldBegin('keyValue', TType.STRUCT, 5)
+            oprot.writeFieldBegin('keyValue', TType.STRUCT, 6)
             self.keyValue.write(oprot)
             oprot.writeFieldEnd()
         oprot.writeFieldStop()
@@ -27169,7 +27180,8 @@ CommitTxnRequest.thrift_spec = (
     (2, TType.STRING, 'replPolicy', 'UTF8', None, ),  # 2
     (3, TType.LIST, 'writeEventInfos', (TType.STRUCT, [WriteEventInfo, None], 
False), None, ),  # 3
     (4, TType.STRUCT, 'replLastIdInfo', [ReplLastIdInfo, None], None, ),  # 4
-    (5, TType.STRUCT, 'keyValue', [CommitTxnKeyValue, None], None, ),  # 5
+    (5, TType.BOOL, 'exclWriteEnabled', None, True, ),  # 5
+    (6, TType.STRUCT, 'keyValue', [CommitTxnKeyValue, None], None, ),  # 6
 )
 all_structs.append(ReplTblWriteIdStateRequest)
 ReplTblWriteIdStateRequest.thrift_spec = (
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index cddea02..3b7a0d6 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3560,13 +3560,15 @@ class CommitTxnRequest
   REPLPOLICY = 2
   WRITEEVENTINFOS = 3
   REPLLASTIDINFO = 4
-  KEYVALUE = 5
+  EXCLWRITEENABLED = 5
+  KEYVALUE = 6
 
   FIELDS = {
     TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid'},
     REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', 
:optional => true},
     WRITEEVENTINFOS => {:type => ::Thrift::Types::LIST, :name => 
'writeEventInfos', :element => {:type => ::Thrift::Types::STRUCT, :class => 
::WriteEventInfo}, :optional => true},
     REPLLASTIDINFO => {:type => ::Thrift::Types::STRUCT, :name => 
'replLastIdInfo', :class => ::ReplLastIdInfo, :optional => true},
+    EXCLWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 
'exclWriteEnabled', :default => true, :optional => true},
     KEYVALUE => {:type => ::Thrift::Types::STRUCT, :name => 'keyValue', :class 
=> ::CommitTxnKeyValue, :optional => true}
   }
 
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 95fcee0..3c3385f 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -3558,7 +3558,7 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
   }
 
   @Override
-  public void replCommitTxn(CommitTxnRequest rqst)
+  public void commitTxn(CommitTxnRequest rqst)
           throws NoSuchTxnException, TxnAbortedException, TException {
     client.commit_txn(rqst);
   }
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 8e0e664..36c5ca3 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3119,7 +3119,7 @@ public interface IMetaStoreClient {
    * aborted.  This can result from the transaction timing out.
    * @throws TException
    */
-  void replCommitTxn(CommitTxnRequest rqst)
+  void commitTxn(CommitTxnRequest rqst)
           throws NoSuchTxnException, TxnAbortedException, TException;
 
   /**
diff --git 
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift 
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 6cee06a..a6f153a 100644
--- 
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ 
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1012,8 +1012,9 @@ struct CommitTxnRequest {
     3: optional list<WriteEventInfo> writeEventInfos,
     // Information to update the last repl id of table/partition along with 
commit txn (replication from 2.6 to 3.0)
     4: optional ReplLastIdInfo replLastIdInfo,
+    5: optional bool exclWriteEnabled = true,
     // An optional key/value to store atomically with the transaction
-    5: optional CommitTxnKeyValue keyValue,
+    6: optional CommitTxnKeyValue keyValue,
 }
 
 struct ReplTblWriteIdStateRequest {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index d93e24b..fef2a13 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -913,8 +913,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           txnid = targetTxnIds.get(0);
         }
 
-        TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, 
TxnStatus.OPEN);
-        if (txnRecord == null) {
+        Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid);
+        if (!txnType.isPresent()) {
           TxnStatus status = findTxnState(txnid, stmt);
           if (status == TxnStatus.ABORTED) {
             if (rqst.isSetReplPolicy()) {
@@ -936,7 +936,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
         if (transactionalListeners != null) {
           
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, 
txnRecord.type), dbConn, sqlGenerator);
+                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, 
txnType.get()), dbConn, sqlGenerator);
         }
 
         LOG.debug("Going to commit");
@@ -1244,8 +1244,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
          * should not normally run concurrently (for same txn) but could due 
to bugs in the client
          * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
          */
-        TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, 
TxnStatus.OPEN);
-        if (txnRecord == null) {
+        Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid);
+        if (!txnType.isPresent()) {
           //if here, txn was not found (in expected state)
           TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
           if (actualTxnStatus == TxnStatus.COMMITTED) {
@@ -1267,7 +1267,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
                 OperationType.UPDATE + "," + OperationType.DELETE + ")";
 
         long tempCommitId = generateTemporaryId();
-        if (txnRecord.type != TxnType.READ_ONLY
+        if (txnType.get() != TxnType.READ_ONLY
                 && !rqst.isSetReplPolicy()
                 && isUpdateOrDelete(stmt, conflictSQLSuffix)) {
 
@@ -1298,35 +1298,38 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
            */
           acquireTxnLock(stmt, false);
           commitId = getHighWaterMark(stmt);
-          /**
-           * see if there are any overlapping txns that wrote the same 
element, i.e. have a conflict
-           * Since entire commit operation is mutexed wrt other start/commit 
ops,
-           * committed.ws_commit_id <= current.ws_commit_id for all txns
-           * thus if committed.ws_commit_id < current.ws_txnid, transactions 
do NOT overlap
-           * For example, [17,20] is committed, [6,80] is being committed 
right now - these overlap
-           * [17,20] committed and [21,21] committing now - these do not 
overlap.
-           * [17,18] committed and [18,19] committing now - these overlap  
(here 18 started while 17 was still running)
-           */
-          try (ResultSet rs = checkForWriteConflict(stmt, txnid)) {
-            if (rs.next()) {
-              //found a conflict, so let's abort the txn
-              String committedTxn = "[" + 
JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
-              StringBuilder resource = new 
StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
-              String partitionName = rs.getString(5);
-              if (partitionName != null) {
-                resource.append('/').append(partitionName);
-              }
-              String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," 
+ commitId + "]" + " due to a write conflict on " + resource +
-                      " committed by " + committedTxn + " " + rs.getString(7) 
+ "/" + rs.getString(8);
-              //remove WRITE_SET info for current txn since it's about to abort
-              dbConn.rollback(undoWriteSetForCurrentTxn);
-              LOG.info(msg);
-              //todo: should make abortTxns() write something into 
TXNS.TXN_META_INFO about this
-              if (abortTxns(dbConn, Collections.singletonList(txnid), false) 
!= 1) {
-                throw new IllegalStateException(msg + " FAILED!");
+
+          if (!rqst.isExclWriteEnabled()) {
+            /**
+             * see if there are any overlapping txns that wrote the same 
element, i.e. have a conflict
+             * Since entire commit operation is mutexed wrt other start/commit 
ops,
+             * committed.ws_commit_id <= current.ws_commit_id for all txns
+             * thus if committed.ws_commit_id < current.ws_txnid, transactions 
do NOT overlap
+             * For example, [17,20] is committed, [6,80] is being committed 
right now - these overlap
+             * [17,20] committed and [21,21] committing now - these do not 
overlap.
+             * [17,18] committed and [18,19] committing now - these overlap  
(here 18 started while 17 was still running)
+             */
+            try (ResultSet rs = checkForWriteConflict(stmt, txnid)) {
+              if (rs.next()) {
+                //found a conflict, so let's abort the txn
+                String committedTxn = "[" + 
JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+                StringBuilder resource = new 
StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+                String partitionName = rs.getString(5);
+                if (partitionName != null) {
+                  resource.append('/').append(partitionName);
+                }
+                String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + 
"," + commitId + "]" + " due to a write conflict on " + resource +
+                        " committed by " + committedTxn + " " + 
rs.getString(7) + "/" + rs.getString(8);
+                //remove WRITE_SET info for current txn since it's about to 
abort
+                dbConn.rollback(undoWriteSetForCurrentTxn);
+                LOG.info(msg);
+                //todo: should make abortTxns() write something into 
TXNS.TXN_META_INFO about this
+                if (abortTxns(dbConn, Collections.singletonList(txnid), false) 
!= 1) {
+                  throw new IllegalStateException(msg + " FAILED!");
+                }
+                dbConn.commit();
+                throw new TxnAbortedException(msg);
               }
-              dbConn.commit();
-              throw new TxnAbortedException(msg);
             }
           }
         } else {
@@ -1343,7 +1346,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           assert true;
         }
 
-        if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) {
+        if (txnType.get() != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) {
           moveTxnComponentsToCompleted(stmt, txnid, isUpdateDelete);
         } else if (rqst.isSetReplPolicy()) {
           if (rqst.isSetWriteEventInfos()) {
@@ -1371,14 +1374,14 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           }
           deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
-        updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, 
commitId, tempCommitId);
+        updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType.get(), 
commitId, tempCommitId);
         if (rqst.isSetKeyValue()) {
           updateKeyValueAssociatedWithTxn(rqst, stmt);
         }
 
         if (transactionalListeners != null) {
           
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, 
txnRecord.type), dbConn, sqlGenerator);
+                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, 
txnType.get()), dbConn, sqlGenerator);
         }
 
         LOG.debug("Going to commit");
@@ -2515,25 +2518,16 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    *
    * SELECT ... FOR UPDATE locks the row until the transaction commits or 
rolls back.
    * Second connection using `SELECT ... FOR UPDATE` will suspend until the 
lock is released.
-   * @param txnState the state this txn is expected to be in.  may be null
-   * @return null if no row was found
+   * @return the txnType wrapped in an {@link Optional}
    * @throws SQLException
    * @throws MetaException
    */
-  private TxnRecord lockTransactionRecord(Statement stmt, long txnId, 
TxnStatus txnState)
-      throws SQLException, MetaException {
+  private Optional<TxnType> getOpenTxnTypeAndLock(Statement stmt, long txnId) 
throws SQLException, MetaException {
     String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + 
txnId
-        + (txnState != null ? " AND \"TXN_STATE\" = " + txnState : "");
+        + " AND \"TXN_STATE\" = " + TxnStatus.OPEN;
     try (ResultSet rs = 
stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) {
-      return rs.next() ? new TxnRecord(rs.getInt(1)) : null;
-    }
-  }
-
-  private static final class TxnRecord {
-    private final TxnType type;
-
-    private TxnRecord(int txnType) {
-      this.type = TxnType.findByValue(txnType);
+      return rs.next() ? Optional.ofNullable(
+          TxnType.findByValue(rs.getInt(1))) : Optional.empty();
     }
   }
 
@@ -2559,8 +2553,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         stmt = dbConn.createStatement();
         if (isValidTxn(txnid)) {
           //this also ensures that txn is still there in expected state
-          TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, 
TxnStatus.OPEN);
-          if (txnRecord == null) {
+          Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid);
+          if (!txnType.isPresent()) {
             ensureValidTxn(dbConn, txnid, stmt);
             shouldNeverHappen(txnid);
           }
@@ -3472,8 +3466,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), 
TxnStatus.OPEN);
-        if (txnRecord == null) {
+        Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, 
rqst.getTxnid());
+        if (!txnType.isPresent()) {
           //ensures txn is still there and in expected state
           ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
           shouldNeverHappen(rqst.getTxnid());
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index de41fd2..0fe3aba 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2429,7 +2429,7 @@ public class HiveMetaStoreClientPreCatalog implements 
IMetaStoreClient, AutoClos
   }
 
   @Override
-  public void replCommitTxn(CommitTxnRequest rqst)
+  public void commitTxn(CommitTxnRequest rqst)
           throws NoSuchTxnException, TxnAbortedException, TException {
     client.commit_txn(rqst);
   }

Reply via email to