This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a67485b HIVE-24125: Incorrect transaction snapshot invalidation with unnecessary writeset check for exclusive operations (Denys Kuzmenko, reviewed by Peter Varga and Peter Vary) a67485b is described below commit a67485bf9d0e1bbc9660226dba3c6ee89ab832fd Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Wed Sep 9 12:01:51 2020 +0200 HIVE-24125: Incorrect transaction snapshot invalidation with unnecessary writeset check for exclusive operations (Denys Kuzmenko, reviewed by Peter Varga and Peter Vary) Revert "HIVE-23725: ValidTxnManager snapshot outdating causing partial reads" (reviewed by Zoltan Haindrich and Peter Varga) This reverts commit e2a02f1b43cba657d4d1c16ead091072be5fe834. HIVE-24125: Incorrect transaction snapshot invalidation with unnecessary writeset check for exclusive operations. Closes (#1474) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 11 +- .../java/org/apache/hadoop/hive/ql/Compiler.java | 1 + ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 69 ++++++++++-- .../org/apache/hadoop/hive/ql/DriverContext.java | 18 ++++ .../org/apache/hadoop/hive/ql/DriverFactory.java | 3 - .../apache/hadoop/hive/ql/DriverTxnHandler.java | 29 +----- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 6 +- .../hadoop/hive/ql/reexec/IReExecutionPlugin.java | 3 +- .../apache/hadoop/hive/ql/reexec/ReExecDriver.java | 14 ++- .../hive/ql/reexec/ReExecuteLostAMQueryPlugin.java | 8 +- .../hive/ql/reexec/ReExecutionDagSubmitPlugin.java | 11 +- .../hive/ql/reexec/ReExecutionOverlayPlugin.java | 4 +- .../hive/ql/reexec/ReExecutionRetryLockPlugin.java | 59 ----------- .../hadoop/hive/ql/reexec/ReOptimizePlugin.java | 8 +- .../org/apache/hadoop/hive/ql/TestTxnCommands.java | 3 +- .../apache/hadoop/hive/ql/TestTxnCommands3.java | 2 + .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 7 +- .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 100 ++++++++++++++---- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 24 ++++- .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 ++- .../hive/metastore/api/CommitTxnRequest.java | 116 +++++++++++++++++++-- .../thrift/gen-php/metastore/CommitTxnRequest.php | 26 ++++- .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 18 +++- .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +- .../hadoop/hive/metastore/HiveMetaStoreClient.java | 2 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 2 +- .../src/main/thrift/hive_metastore.thrift | 3 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 104 +++++++++--------- .../metastore/HiveMetaStoreClientPreCatalog.java | 2 +- 29 files changed, 432 insertions(+), 237 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f4712d3..06fff9d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5163,8 +5163,7 @@ public class HiveConf extends Configuration { "comma separated list of plugin can be used:\n" + " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n" + " reoptimize: collects operator statistics during execution and recompile the query after a failure\n" - + " reexecute_lost_am: reexecutes query if it failed due to tez am node gets decommissioned\n" - + " The retrylock strategy is always enabled: recompiles the query if snapshot becomes outdated before lock acquisition"), + + " reexecute_lost_am: reexecutes query if it failed due to tez am node gets decommissioned"), HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope", "metastore", new StringSet("query", "hiveserver", "metastore"), "Sets the persistence scope of runtime statistics\n" @@ -5172,13 +5171,11 @@ public class HiveConf extends Configuration { + " hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it\n" + " metastore: runtime statistics are persisted in the metastore as well"), + HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT("hive.txn.retrysnapshot.max.count", 5, new RangeValidator(1, 20), + "Maximum number of snapshot invalidate attempts per request."), HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1, - "Maximum number of re-executions for a single query." - + " The maximum re-execution retry is limited at 10"), - HIVE_QUERY_MAX_REEXECUTION_RETRYLOCK_COUNT("hive.query.reexecution.retrylock.max.count", 5, - "Maximum number of re-executions with retrylock strategy for a single query." - + " The maximum re-execution retry is limited at 10"), + "Maximum number of re-executions for a single query."), HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false, "If sessionstats are enabled; this option can be used to collect statistics all the time"), HIVE_QUERY_REEXECUTION_STATS_CACHE_BATCH_SIZE("hive.query.reexecution.stats.cache.batch.size", -1, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java index c51a146..b31d8c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java @@ -189,6 +189,7 @@ public class Compiler { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); + driverContext.setBackupContext(new Context(context)); boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks(); HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4d2d869..c4517fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; @@ -72,8 +74,7 @@ public class Driver implements IDriver { private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private static final LogHelper CONSOLE = new LogHelper(LOG); - // Exception message that ReExecutionRetryLockPlugin will recognize - public static final String SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED = + private static final String SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED = "snapshot was outdated when locks were acquired"; private int maxRows = 100; @@ -137,13 +138,13 @@ public class Driver implements IDriver { public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnManager) { driverContext = new DriverContext(queryState, queryInfo, new HookRunner(queryState.getConf(), CONSOLE), txnManager); - driverTxnHandler = new DriverTxnHandler(this, driverContext, driverState); + driverTxnHandler = new DriverTxnHandler(driverContext, driverState); } /** * Compiles a new HQL command, but potentially resets taskID counter. Not resetting task counter is useful for * generating re-entrant QL queries. - * + * * @param command The HiveQL query to compile * @param resetTaskIds Resets taskID counter if true. * @return 0 for ok @@ -159,7 +160,7 @@ public class Driver implements IDriver { /** * Compiles an HQL command, creates an execution plan for it. - * + * * @param deferClose indicates if the close/destroy should be deferred when the process has been interrupted, it * should be set to true if the compile is called within another method like runInternal, which defers the * close to the called in that method. @@ -490,7 +491,63 @@ public class Driver implements IDriver { DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring the lock.", null, null); lockAndRespond(); - driverTxnHandler.validateTxnListState(); + + int retryShapshotCnt = 0; + int maxRetrySnapshotCnt = HiveConf.getIntVar(driverContext.getConf(), + HiveConf.ConfVars.HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT); + + try { + while (!driverTxnHandler.isValidTxnListState() && ++retryShapshotCnt <= maxRetrySnapshotCnt) { + LOG.info("Re-compiling after acquiring locks, attempt #" + retryShapshotCnt); + // Snapshot was outdated when locks were acquired, hence regenerate context, txn list and retry. + // TODO: Lock acquisition should be moved before analyze, this is a bit hackish. + // Currently, we acquire a snapshot, compile the query with that snapshot, and then - acquire locks. + // If snapshot is still valid, we continue as usual. + // But if snapshot is not valid, we recompile the query. + if (driverContext.isOutdatedTxn()) { + LOG.info("Snapshot is outdated, re-initiating transaction ..."); + driverContext.getTxnManager().rollbackTxn(); + + String userFromUGI = DriverUtils.getUserFromUGI(driverContext); + driverContext.getTxnManager().openTxn(context, userFromUGI, driverContext.getTxnType()); + lockAndRespond(); + } + + driverContext.setRetrial(true); + driverContext.getBackupContext().addSubContext(context); + driverContext.getBackupContext().setHiveLocks(context.getHiveLocks()); + context = driverContext.getBackupContext(); + + driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, + driverContext.getTxnManager().getValidTxns().toString()); + + if (driverContext.getPlan().hasAcidResourcesInQuery()) { + compileInternal(context.getCmd(), true); + driverTxnHandler.recordValidWriteIds(); + driverTxnHandler.setWriteIdForAcidFileSinks(); + } + // Since we're reusing the compiled plan, we need to update its start time for current run + driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime()); + } + + if (retryShapshotCnt > maxRetrySnapshotCnt) { + // Throw exception + HiveException e = new HiveException( + "Operation could not be executed, " + SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED + "."); + DriverUtils.handleHiveException(driverContext, e, 14, null); + + } else if (retryShapshotCnt != 0) { + //Reset the PerfLogger + perfLogger = SessionState.getPerfLogger(true); + + // the reason that we set the txn manager for the cxt here is because each + // query has its own ctx object. The txn mgr is shared across the + // same instance of Driver, which can run multiple queries. + context.setHiveTxnManager(driverContext.getTxnManager()); + } + } catch (LockException | SemanticException e) { + DriverUtils.handleHiveException(driverContext, e, 13, null); + } try { taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index ce551cc..0afa657 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -59,6 +59,7 @@ public class DriverContext { // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager txnManager; private TxnType txnType = TxnType.DEFAULT; + private boolean outdatedTxn; private StatsSource statsSource; // Boolean to store information about whether valid txn list was generated @@ -70,6 +71,7 @@ public class DriverContext { private ValidWriteIdList compactionWriteIds = null; private long compactorTxnId = 0; + private Context backupContext = null; private boolean retrial = false; private DataInput resStream; @@ -154,6 +156,14 @@ public class DriverContext { return txnType; } + public void setOutdatedTxn(boolean outdated) { + this.outdatedTxn = outdated; + } + + public boolean isOutdatedTxn() { + return outdatedTxn; + } + public void setTxnType(TxnType txnType) { this.txnType = txnType; } @@ -206,6 +216,14 @@ public class DriverContext { this.compactorTxnId = compactorTxnId; } + public Context getBackupContext() { + return backupContext; + } + + public void setBackupContext(Context backupContext) { + this.backupContext = backupContext; + } + public boolean isRetrial() { return retrial; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java index f2be56a..8817e42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin; import org.apache.hadoop.hive.ql.reexec.ReExecDriver; -import org.apache.hadoop.hive.ql.reexec.ReExecutionRetryLockPlugin; import org.apache.hadoop.hive.ql.reexec.ReExecuteLostAMQueryPlugin; import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin; import org.apache.hadoop.hive.ql.reexec.ReExecutionDagSubmitPlugin; @@ -60,8 +59,6 @@ public final class DriverFactory { } plugins.add(buildReExecPlugin(string)); } - // The retrylock plugin is always enabled - plugins.add(new ReExecutionRetryLockPlugin()); return new ReExecDriver(queryState, queryInfo, plugins); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java index 00fb75d..a5e13c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java @@ -94,7 +94,7 @@ class DriverTxnHandler { private Context context; private Runnable txnRollbackRunner; - DriverTxnHandler(Driver driver, DriverContext driverContext, DriverState driverState) { + DriverTxnHandler(DriverContext driverContext, DriverState driverState) { this.driverContext = driverContext; this.driverState = driverState; } @@ -249,7 +249,7 @@ class DriverTxnHandler { } } - private void setWriteIdForAcidFileSinks() throws SemanticException, LockException { + void setWriteIdForAcidFileSinks() throws SemanticException, LockException { if (!driverContext.getPlan().getAcidSinks().isEmpty()) { List<FileSinkDesc> acidSinks = new ArrayList<>(driverContext.getPlan().getAcidSinks()); //sorting makes tests easier to write since file names and ROW__IDs depend on statementId @@ -318,7 +318,7 @@ class DriverTxnHandler { * Write the current set of valid write ids for the operated acid tables into the configuration so * that it can be read by the input format. */ - private ValidTxnWriteIdList recordValidWriteIds() throws LockException { + ValidTxnWriteIdList recordValidWriteIds() throws LockException { String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); if (Strings.isNullOrEmpty(txnString)) { throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + @@ -380,32 +380,12 @@ class DriverTxnHandler { } } - void validateTxnListState() throws CommandProcessorException { - try { - if (!isValidTxnListState()) { - LOG.warn("Reexecuting after acquiring locks, since snapshot was outdated."); - // Snapshot was outdated when locks were acquired, hence regenerate context, - // txn list and retry (see ReExecutionRetryLockPlugin) - try { - endTransactionAndCleanup(false); - } catch (LockException e) { - DriverUtils.handleHiveException(driverContext, e, 12, null); - } - HiveException e = new HiveException( - "Operation could not be executed, " + Driver.SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED + "."); - DriverUtils.handleHiveException(driverContext, e, 14, null); - } - } catch (LockException e) { - DriverUtils.handleHiveException(driverContext, e, 13, null); - } - } - /** * Checks whether txn list has been invalidated while planning the query. * This would happen if query requires exclusive/semi-shared lock, and there has been a committed transaction * on the table over which the lock is required. */ - private boolean isValidTxnListState() throws LockException { + boolean isValidTxnListState() throws LockException { // 1) Get valid txn list. String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); if (txnString == null) { @@ -493,6 +473,7 @@ class DriverTxnHandler { // Check if there was a conflicting write between current SNAPSHOT generation and locking. if (currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1, currentWriteIdList.getHighWatermark()) != ValidWriteIdList.RangeResponse.NONE) { + driverContext.setOutdatedTxn(true); return false; } // Check that write id is still valid diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index effbcf6..abadb43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -505,7 +505,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl { // For transaction started internally by repl load command, heartbeat needs to be stopped. clearLocksAndHB(); } - getMS().replCommitTxn(rqst); + getMS().commitTxn(rqst); } catch (NoSuchTxnException e) { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(rqst.getTxnid())); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(rqst.getTxnid())); @@ -533,7 +533,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl { // do all new clear in clearLocksAndHB method to make sure that same code is there for replCommitTxn flow. clearLocksAndHB(); LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId)); - getMS().commitTxn(txnId); + CommitTxnRequest commitTxnRequest = new CommitTxnRequest(txnId); + commitTxnRequest.setExclWriteEnabled(conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)); + getMS().commitTxn(commitTxnRequest); } catch (NoSuchTxnException e) { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId)); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java index c6d848c..be62fc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; /** * Defines an interface for re-execution logics. @@ -48,7 +47,7 @@ public interface IReExecutionPlugin { /** * The query have failed, does this plugin advises to re-execute it again? */ - boolean shouldReExecute(int executionNum, CommandProcessorException ex); + boolean shouldReExecute(int executionNum); /** * The plugin should prepare for the re-compilaton of the query. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java index 42dac46..c307085 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -57,10 +57,6 @@ import com.google.common.annotations.VisibleForTesting; */ public class ReExecDriver implements IDriver { - // Every plugin should check for execution limit in shouldReexecute - // But just in case, we don't want an infinite loop - private final static int MAX_EXECUTION = 10; - private class HandleReOptimizationExplain implements HiveSemanticAnalyzerHook { @Override @@ -152,6 +148,8 @@ public class ReExecDriver implements IDriver { @Override public CommandProcessorResponse run() throws CommandProcessorException { executionIndex = 0; + int maxExecutuions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT); + while (true) { executionIndex++; @@ -172,9 +170,9 @@ public class ReExecDriver implements IDriver { afterExecute(oldPlanMapper, cpr != null); boolean shouldReExecute = explainReOptimization && executionIndex==1; - shouldReExecute |= cpr == null && shouldReExecute(cpe); + shouldReExecute |= cpr == null && shouldReExecute(); - if (executionIndex >= MAX_EXECUTION || !shouldReExecute) { + if (executionIndex >= maxExecutuions || !shouldReExecute) { if (cpr != null) { return cpr; } else { @@ -216,10 +214,10 @@ public class ReExecDriver implements IDriver { return ret; } - private boolean shouldReExecute(CommandProcessorException ex) { + private boolean shouldReExecute() { boolean ret = false; for (IReExecutionPlugin p : plugins) { - boolean shouldReExecute = p.shouldReExecute(executionIndex, ex); + boolean shouldReExecute = p.shouldReExecute(executionIndex); LOG.debug("{}.shouldReExecute = {}", p, shouldReExecute); ret |= shouldReExecute; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java index 6ada8f3..e779ad2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteLostAMQueryPlugin.java @@ -18,12 +18,10 @@ package org.apache.hadoop.hive.ql.reexec; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +33,6 @@ import java.util.regex.Pattern; public class ReExecuteLostAMQueryPlugin implements IReExecutionPlugin { private static final Logger LOG = LoggerFactory.getLogger(ReExecuteLostAMQueryPlugin.class); private boolean retryPossible; - private int maxExecutions = 1; // Lost am container have exit code -100, due to node failures. This pattern of exception is thrown when AM is managed // by HS2. @@ -64,7 +61,6 @@ public class ReExecuteLostAMQueryPlugin implements IReExecutionPlugin { @Override public void initialize(Driver driver) { driver.getHookRunner().addOnFailureHook(new LocalHook()); - maxExecutions = 1 + driver.getConf().getIntVar(HiveConf.ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT); } @Override @@ -72,8 +68,8 @@ public class ReExecuteLostAMQueryPlugin implements IReExecutionPlugin { } @Override - public boolean shouldReExecute(int executionNum, CommandProcessorException ex) { - return (executionNum < maxExecutions) && retryPossible; + public boolean shouldReExecute(int executionNum) { + return retryPossible; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java index b6e76da..b019a00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionDagSubmitPlugin.java @@ -18,13 +18,11 @@ package org.apache.hadoop.hive.ql.reexec; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory; public class ReExecutionDagSubmitPlugin implements IReExecutionPlugin { private static final Logger LOG = LoggerFactory.getLogger(ReExecutionDagSubmitPlugin.class); - private int maxExecutions = 1; + class LocalHook implements ExecuteWithHookContext { @Override @@ -57,7 +55,6 @@ public class ReExecutionDagSubmitPlugin implements IReExecutionPlugin { @Override public void initialize(Driver driver) { driver.getHookRunner().addOnFailureHook(new LocalHook()); - maxExecutions = 1 + driver.getConf().getIntVar(HiveConf.ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT); } private boolean retryPossible; @@ -68,7 +65,7 @@ public class ReExecutionDagSubmitPlugin implements IReExecutionPlugin { @Override public boolean shouldReExecute(int executionNum, PlanMapper pm1, PlanMapper pm2) { - return (executionNum < maxExecutions) && retryPossible; + return retryPossible; } @Override @@ -76,8 +73,8 @@ public class ReExecutionDagSubmitPlugin implements IReExecutionPlugin { } @Override - public boolean shouldReExecute(int executionNum, CommandProcessorException ex) { - return (executionNum < maxExecutions) && retryPossible; + public boolean shouldReExecute(int executionNum) { + return retryPossible; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java index 84b5960..83df334 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.tez.dag.api.TezConfiguration; /** @@ -75,8 +74,7 @@ public class ReExecutionOverlayPlugin implements IReExecutionPlugin { } @Override - public boolean shouldReExecute(int executionNum, CommandProcessorException ex) { - + public boolean shouldReExecute(int executionNum) { return executionNum == 1 && !subtree.isEmpty() && retryPossible; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionRetryLockPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionRetryLockPlugin.java deleted file mode 100644 index 6366c4e..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionRetryLockPlugin.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.reexec; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; - -public class ReExecutionRetryLockPlugin implements IReExecutionPlugin { - - private Driver coreDriver; - private int maxRetryLockExecutions = 1; - - @Override - public void initialize(Driver driver) { - coreDriver = driver; - maxRetryLockExecutions = 1 + coreDriver.getConf().getIntVar(HiveConf.ConfVars.HIVE_QUERY_MAX_REEXECUTION_RETRYLOCK_COUNT); - } - - @Override - public void beforeExecute(int executionIndex, boolean explainReOptimization) { - } - - @Override - public boolean shouldReExecute(int executionNum, CommandProcessorException ex) { - return executionNum < maxRetryLockExecutions && ex != null && - ex.getMessage().contains(Driver.SNAPSHOT_WAS_OUTDATED_WHEN_LOCKS_WERE_ACQUIRED); - } - - @Override - public void prepareToReExecute() { - } - - @Override - public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { - return executionNum < maxRetryLockExecutions; - } - - @Override - public void afterExecute(PlanMapper planMapper, boolean successfull) { - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java index ecef303..09045af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; import org.apache.hadoop.hive.ql.plan.mapper.StatsSources; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +49,6 @@ public class ReOptimizePlugin implements IReExecutionPlugin { private boolean alwaysCollectStats; - private int maxExecutions = 1; - class LocalHook implements ExecuteWithHookContext { @Override @@ -83,15 +80,14 @@ public class ReOptimizePlugin implements IReExecutionPlugin { coreDriver.getHookRunner().addOnFailureHook(statsReaderHook); coreDriver.getHookRunner().addPostHook(statsReaderHook); alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS); - maxExecutions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT); statsReaderHook.setCollectOnSuccess(alwaysCollectStats); coreDriver.setStatsSource(StatsSources.getStatsSource(driver.getConf())); } @Override - public boolean shouldReExecute(int executionNum, CommandProcessorException ex) { - return (executionNum < maxExecutions) && retryPossible; + public boolean shouldReExecute(int executionNum) { + return retryPossible; } @Override diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index e1185de..1725a14 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -275,7 +275,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { } catch (HiveException e) { throw new RuntimeException(e); } - try (IDriver d = DriverFactory.newDriver(hiveConf)) { + QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build(); + try (Driver d = new Driver(qs)) { LOG.info("Ready to run the query: " + query); syncThreadStart(cdlIn, cdlOut); try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index de3d296..ba50a2b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -60,6 +60,8 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests { @Test public void testRenameTable() throws Exception { MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + hiveConf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); + runStatementOnDriver("drop database if exists mydb1 cascade"); runStatementOnDriver("drop database if exists mydb2 cascade"); runStatementOnDriver("create database mydb1"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index 651969b..b256415 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -24,8 +24,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.DriverFactory; -import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; @@ -41,8 +39,7 @@ public abstract class DbTxnManagerEndToEndTestBase { protected static HiveConf conf = new HiveConf(Driver.class); protected HiveTxnManager txnMgr; protected Context ctx; - protected Driver driver; - protected IDriver driver2; + protected Driver driver, driver2; protected TxnStore txnHandler; public DbTxnManagerEndToEndTestBase() { @@ -66,7 +63,7 @@ public abstract class DbTxnManagerEndToEndTestBase { SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); - driver2 = DriverFactory.newDriver(conf); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); TxnDbUtil.cleanDb(conf); SessionState ss = SessionState.get(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index b18f98b..235d756 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -34,10 +34,9 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; -import org.apache.hadoop.hive.ql.DriverFactory; -import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.TestTxnCommands2; -import org.apache.hadoop.hive.ql.reexec.ReExecDriver; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -2007,8 +2006,8 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ catch (LockException e) { expectedException = e; } - if (causeConflict) { - Assert.assertNotNull("didn't get exception", expectedException); + if (causeConflict && sharedWrite) { + Assert.assertNotNull("Didn't get exception", expectedException); try { Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=2 " + @@ -2041,7 +2040,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from \"COMPLETED_TXN_COMPONENTS\""), - 4, + causeConflict ? 6 : 4, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"COMPLETED_TXN_COMPONENTS\" where \"CTC_TXNID\"=" + txnId2)); Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + @@ -2052,7 +2051,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ Assert.assertEquals( "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from \"WRITE_SET\""), - 0, + causeConflict ? 2 : 0, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\" where \"WS_TXNID\"=" + txnId2 + " and \"WS_OPERATION_TYPE\"='d'")); } @@ -2167,7 +2166,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ catch (LockException e) { expectedException = e; } - if (causeConflict) { + if (causeConflict && sharedWrite) { Assert.assertNotNull("Didn't get exception", expectedException); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", @@ -2258,7 +2257,6 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr); driver.run(query); - driver.run("select * from target"); swapTxnManager(txnMgr2); try { @@ -2276,6 +2274,68 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ } @Test + public void testUpdateMergeUpdateConcurrentSnapshotInvalidate() throws Exception { + dropTable(new String[]{"target", "source"}); + driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + driver.compileAndRespond("update target set a=5 where a=1"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when matched then update set b=8"); + + swapTxnManager(txnMgr); + driver.run(); + + swapTxnManager(txnMgr2); + driver2.run(); + + swapTxnManager(txnMgr); + driver.run("select * from target"); + List res = new ArrayList(); + driver.getFetchTask().fetch(res); + Assert.assertEquals(2, res.size()); + Assert.assertEquals("Lost Update", "5\t8", res.get(1)); + dropTable(new String[]{"target", "source"}); + } + + @Test + public void testUpdateMergeUpdateConcurrentSnapshotInvalidateNewTxn() throws Exception { + dropTable(new String[]{"target", "source"}); + driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when matched then update set b=8"); + + swapTxnManager(txnMgr); + driver.run("update target set a=5 where a=1"); + + swapTxnManager(txnMgr2); + driver2.run(); + + swapTxnManager(txnMgr); + driver.run("select * from target"); + List res = new ArrayList(); + driver.getFetchTask().fetch(res); + Assert.assertEquals(2, res.size()); + Assert.assertEquals("Lost Update", "5\t8", res.get(1)); + dropTable(new String[]{"target", "source"}); + } + + @Test public void test2MergeInsertsConcurrentNoDuplicates() throws Exception { testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " + "when not matched then insert values (s.a, s.b)", false); @@ -2335,7 +2395,6 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ */ @Test public void testMergeInsertDynamicPartitioningSequential() throws Exception { - dropTable(new String[]{"target", "source"}); conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); @@ -2354,7 +2413,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ // txn3 merge driver.run("merge into target t using source s on t.a = s.a " + - "when not matched then insert values (s.a, s.b, s.c)"); + "when not matched then insert values (s.a, s.b, s.c)"); driver.run("select * from target"); List res = new ArrayList(); driver.getFetchTask().fetch(res); @@ -2366,10 +2425,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ @Test public void testMergeInsertDynamicPartitioningSnapshotInvalidatedWithOldCommit() throws Exception { - - // By creating the driver with the factory, we should have a ReExecDriver - IDriver driver3 = DriverFactory.newDriver(conf); - Assert.assertTrue("ReExecDriver was expected", driver3 instanceof ReExecDriver); + Driver driver3 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); dropTable(new String[]{"target", "source"}); conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); @@ -2395,7 +2451,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ // Compile txn 3 with only 1 known partition driver3.compileAndRespond("merge into target t using source s on t.a = s.a " + - "when not matched then insert values (s.a, s.b, s.c)"); + "when not matched then insert values (s.a, s.b, s.c)"); swapTxnManager(txnMgr); driver.run(); @@ -2414,15 +2470,13 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ // The merge should see all three partition and not create duplicates Assert.assertEquals("Duplicate records found", 6, res.size()); Assert.assertTrue("Partition 3 was skipped", res.contains("6\t6\t3")); + driver3.close(); dropTable(new String[]{"target", "source"}); } - @Test public void testMergeInsertDynamicPartitioningSnapshotInvalidatedWithNewCommit() throws Exception { - // By creating the driver with the factory, we should have a ReExecDriver - IDriver driver3 = DriverFactory.newDriver(conf); - Assert.assertTrue("ReExecDriver was expected", driver3 instanceof ReExecDriver); + Driver driver3 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); dropTable(new String[]{"target", "source"}); conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); @@ -2438,7 +2492,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr3); // Compile txn 1 merge with only 1 known partition driver3.compileAndRespond("merge into target t using source s on t.a = s.a " + - "when not matched then insert values (s.a, s.b, s.c)"); + "when not matched then insert values (s.a, s.b, s.c)"); swapTxnManager(txnMgr); // txn 2 insert data to an old and a new partition @@ -2451,7 +2505,6 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ swapTxnManager(txnMgr3); driver3.run(); - swapTxnManager(txnMgr); driver.run("select * from target"); List res = new ArrayList(); @@ -2459,6 +2512,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ // The merge should see all three partition and not create duplicates Assert.assertEquals("Duplicate records found", 6, res.size()); Assert.assertTrue("Partition 3 was skipped", res.contains("6\t6\t3")); + driver3.close(); dropTable(new String[]{"target", "source"}); } @@ -2691,7 +2745,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ catch (LockException e) { expectedException = e; } - if (causeConflict) { + if (causeConflict && sharedWrite) { Assert.assertNotNull("Didn't get exception", expectedException); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 1341c65..68ae4cd 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -20642,6 +20642,11 @@ void CommitTxnRequest::__set_replLastIdInfo(const ReplLastIdInfo& val) { __isset.replLastIdInfo = true; } +void CommitTxnRequest::__set_exclWriteEnabled(const bool val) { + this->exclWriteEnabled = val; +__isset.exclWriteEnabled = true; +} + void CommitTxnRequest::__set_keyValue(const CommitTxnKeyValue& val) { this->keyValue = val; __isset.keyValue = true; @@ -20720,6 +20725,14 @@ uint32_t CommitTxnRequest::read(::apache::thrift::protocol::TProtocol* iprot) { } break; case 5: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->exclWriteEnabled); + this->__isset.exclWriteEnabled = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: if (ftype == ::apache::thrift::protocol::T_STRUCT) { xfer += this->keyValue.read(iprot); this->__isset.keyValue = true; @@ -20773,8 +20786,13 @@ uint32_t CommitTxnRequest::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += this->replLastIdInfo.write(oprot); xfer += oprot->writeFieldEnd(); } + if (this->__isset.exclWriteEnabled) { + xfer += oprot->writeFieldBegin("exclWriteEnabled", ::apache::thrift::protocol::T_BOOL, 5); + xfer += oprot->writeBool(this->exclWriteEnabled); + xfer += oprot->writeFieldEnd(); + } if (this->__isset.keyValue) { - xfer += oprot->writeFieldBegin("keyValue", ::apache::thrift::protocol::T_STRUCT, 5); + xfer += oprot->writeFieldBegin("keyValue", ::apache::thrift::protocol::T_STRUCT, 6); xfer += this->keyValue.write(oprot); xfer += oprot->writeFieldEnd(); } @@ -20789,6 +20807,7 @@ void swap(CommitTxnRequest &a, CommitTxnRequest &b) { swap(a.replPolicy, b.replPolicy); swap(a.writeEventInfos, b.writeEventInfos); swap(a.replLastIdInfo, b.replLastIdInfo); + swap(a.exclWriteEnabled, b.exclWriteEnabled); swap(a.keyValue, b.keyValue); swap(a.__isset, b.__isset); } @@ -20798,6 +20817,7 @@ CommitTxnRequest::CommitTxnRequest(const CommitTxnRequest& other759) { replPolicy = other759.replPolicy; writeEventInfos = other759.writeEventInfos; replLastIdInfo = other759.replLastIdInfo; + exclWriteEnabled = other759.exclWriteEnabled; keyValue = other759.keyValue; __isset = other759.__isset; } @@ -20806,6 +20826,7 @@ CommitTxnRequest& CommitTxnRequest::operator=(const CommitTxnRequest& other760) replPolicy = other760.replPolicy; writeEventInfos = other760.writeEventInfos; replLastIdInfo = other760.replLastIdInfo; + exclWriteEnabled = other760.exclWriteEnabled; keyValue = other760.keyValue; __isset = other760.__isset; return *this; @@ -20817,6 +20838,7 @@ void CommitTxnRequest::printTo(std::ostream& out) const { out << ", " << "replPolicy="; (__isset.replPolicy ? (out << to_string(replPolicy)) : (out << "<null>")); out << ", " << "writeEventInfos="; (__isset.writeEventInfos ? (out << to_string(writeEventInfos)) : (out << "<null>")); out << ", " << "replLastIdInfo="; (__isset.replLastIdInfo ? (out << to_string(replLastIdInfo)) : (out << "<null>")); + out << ", " << "exclWriteEnabled="; (__isset.exclWriteEnabled ? (out << to_string(exclWriteEnabled)) : (out << "<null>")); out << ", " << "keyValue="; (__isset.keyValue ? (out << to_string(keyValue)) : (out << "<null>")); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index d805068..c1c186f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -7853,10 +7853,11 @@ void swap(ReplLastIdInfo &a, ReplLastIdInfo &b); std::ostream& operator<<(std::ostream& out, const ReplLastIdInfo& obj); typedef struct _CommitTxnRequest__isset { - _CommitTxnRequest__isset() : replPolicy(false), writeEventInfos(false), replLastIdInfo(false), keyValue(false) {} + _CommitTxnRequest__isset() : replPolicy(false), writeEventInfos(false), replLastIdInfo(false), exclWriteEnabled(true), keyValue(false) {} bool replPolicy :1; bool writeEventInfos :1; bool replLastIdInfo :1; + bool exclWriteEnabled :1; bool keyValue :1; } _CommitTxnRequest__isset; @@ -7865,7 +7866,7 @@ class CommitTxnRequest : public virtual ::apache::thrift::TBase { CommitTxnRequest(const CommitTxnRequest&); CommitTxnRequest& operator=(const CommitTxnRequest&); - CommitTxnRequest() : txnid(0), replPolicy() { + CommitTxnRequest() : txnid(0), replPolicy(), exclWriteEnabled(true) { } virtual ~CommitTxnRequest() noexcept; @@ -7873,6 +7874,7 @@ class CommitTxnRequest : public virtual ::apache::thrift::TBase { std::string replPolicy; std::vector<WriteEventInfo> writeEventInfos; ReplLastIdInfo replLastIdInfo; + bool exclWriteEnabled; CommitTxnKeyValue keyValue; _CommitTxnRequest__isset __isset; @@ -7885,6 +7887,8 @@ class CommitTxnRequest : public virtual ::apache::thrift::TBase { void __set_replLastIdInfo(const ReplLastIdInfo& val); + void __set_exclWriteEnabled(const bool val); + void __set_keyValue(const CommitTxnKeyValue& val); bool operator == (const CommitTxnRequest & rhs) const @@ -7903,6 +7907,10 @@ class CommitTxnRequest : public virtual ::apache::thrift::TBase { return false; else if (__isset.replLastIdInfo && !(replLastIdInfo == rhs.replLastIdInfo)) return false; + if (__isset.exclWriteEnabled != rhs.__isset.exclWriteEnabled) + return false; + else if (__isset.exclWriteEnabled && !(exclWriteEnabled == rhs.exclWriteEnabled)) + return false; if (__isset.keyValue != rhs.__isset.keyValue) return false; else if (__isset.keyValue && !(keyValue == rhs.keyValue)) diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java index 9704544..1e7cd1d 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CommitTxnRequest.java @@ -15,7 +15,8 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField REPL_POLICY_FIELD_DESC = new org.apache.thrift.protocol.TField("replPolicy", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField WRITE_EVENT_INFOS_FIELD_DESC = new org.apache.thrift.protocol.TField("writeEventInfos", org.apache.thrift.protocol.TType.LIST, (short)3); private static final org.apache.thrift.protocol.TField REPL_LAST_ID_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("replLastIdInfo", org.apache.thrift.protocol.TType.STRUCT, (short)4); - private static final org.apache.thrift.protocol.TField KEY_VALUE_FIELD_DESC = new org.apache.thrift.protocol.TField("keyValue", org.apache.thrift.protocol.TType.STRUCT, (short)5); + private static final org.apache.thrift.protocol.TField EXCL_WRITE_ENABLED_FIELD_DESC = new org.apache.thrift.protocol.TField("exclWriteEnabled", org.apache.thrift.protocol.TType.BOOL, (short)5); + private static final org.apache.thrift.protocol.TField KEY_VALUE_FIELD_DESC = new org.apache.thrift.protocol.TField("keyValue", org.apache.thrift.protocol.TType.STRUCT, (short)6); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CommitTxnRequestStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CommitTxnRequestTupleSchemeFactory(); @@ -24,6 +25,7 @@ package org.apache.hadoop.hive.metastore.api; private @org.apache.thrift.annotation.Nullable java.lang.String replPolicy; // optional private @org.apache.thrift.annotation.Nullable java.util.List<WriteEventInfo> writeEventInfos; // optional private @org.apache.thrift.annotation.Nullable ReplLastIdInfo replLastIdInfo; // optional + private boolean exclWriteEnabled; // optional private @org.apache.thrift.annotation.Nullable CommitTxnKeyValue keyValue; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -32,7 +34,8 @@ package org.apache.hadoop.hive.metastore.api; REPL_POLICY((short)2, "replPolicy"), WRITE_EVENT_INFOS((short)3, "writeEventInfos"), REPL_LAST_ID_INFO((short)4, "replLastIdInfo"), - KEY_VALUE((short)5, "keyValue"); + EXCL_WRITE_ENABLED((short)5, "exclWriteEnabled"), + KEY_VALUE((short)6, "keyValue"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -56,7 +59,9 @@ package org.apache.hadoop.hive.metastore.api; return WRITE_EVENT_INFOS; case 4: // REPL_LAST_ID_INFO return REPL_LAST_ID_INFO; - case 5: // KEY_VALUE + case 5: // EXCL_WRITE_ENABLED + return EXCL_WRITE_ENABLED; + case 6: // KEY_VALUE return KEY_VALUE; default: return null; @@ -100,8 +105,9 @@ package org.apache.hadoop.hive.metastore.api; // isset id assignments private static final int __TXNID_ISSET_ID = 0; + private static final int __EXCLWRITEENABLED_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.REPL_POLICY,_Fields.WRITE_EVENT_INFOS,_Fields.REPL_LAST_ID_INFO,_Fields.KEY_VALUE}; + private static final _Fields optionals[] = {_Fields.REPL_POLICY,_Fields.WRITE_EVENT_INFOS,_Fields.REPL_LAST_ID_INFO,_Fields.EXCL_WRITE_ENABLED,_Fields.KEY_VALUE}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -114,6 +120,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WriteEventInfo.class)))); tmpMap.put(_Fields.REPL_LAST_ID_INFO, new org.apache.thrift.meta_data.FieldMetaData("replLastIdInfo", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ReplLastIdInfo.class))); + tmpMap.put(_Fields.EXCL_WRITE_ENABLED, new org.apache.thrift.meta_data.FieldMetaData("exclWriteEnabled", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.KEY_VALUE, new org.apache.thrift.meta_data.FieldMetaData("keyValue", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CommitTxnKeyValue.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); @@ -121,6 +129,8 @@ package org.apache.hadoop.hive.metastore.api; } public CommitTxnRequest() { + this.exclWriteEnabled = true; + } public CommitTxnRequest( @@ -150,6 +160,7 @@ package org.apache.hadoop.hive.metastore.api; if (other.isSetReplLastIdInfo()) { this.replLastIdInfo = new ReplLastIdInfo(other.replLastIdInfo); } + this.exclWriteEnabled = other.exclWriteEnabled; if (other.isSetKeyValue()) { this.keyValue = new CommitTxnKeyValue(other.keyValue); } @@ -166,6 +177,8 @@ package org.apache.hadoop.hive.metastore.api; this.replPolicy = null; this.writeEventInfos = null; this.replLastIdInfo = null; + this.exclWriteEnabled = true; + this.keyValue = null; } @@ -279,6 +292,28 @@ package org.apache.hadoop.hive.metastore.api; } } + public boolean isExclWriteEnabled() { + return this.exclWriteEnabled; + } + + public void setExclWriteEnabled(boolean exclWriteEnabled) { + this.exclWriteEnabled = exclWriteEnabled; + setExclWriteEnabledIsSet(true); + } + + public void unsetExclWriteEnabled() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __EXCLWRITEENABLED_ISSET_ID); + } + + /** Returns true if field exclWriteEnabled is set (has been assigned a value) and false otherwise */ + public boolean isSetExclWriteEnabled() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __EXCLWRITEENABLED_ISSET_ID); + } + + public void setExclWriteEnabledIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __EXCLWRITEENABLED_ISSET_ID, value); + } + @org.apache.thrift.annotation.Nullable public CommitTxnKeyValue getKeyValue() { return this.keyValue; @@ -337,6 +372,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case EXCL_WRITE_ENABLED: + if (value == null) { + unsetExclWriteEnabled(); + } else { + setExclWriteEnabled((java.lang.Boolean)value); + } + break; + case KEY_VALUE: if (value == null) { unsetKeyValue(); @@ -363,6 +406,9 @@ package org.apache.hadoop.hive.metastore.api; case REPL_LAST_ID_INFO: return getReplLastIdInfo(); + case EXCL_WRITE_ENABLED: + return isExclWriteEnabled(); + case KEY_VALUE: return getKeyValue(); @@ -385,6 +431,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetWriteEventInfos(); case REPL_LAST_ID_INFO: return isSetReplLastIdInfo(); + case EXCL_WRITE_ENABLED: + return isSetExclWriteEnabled(); case KEY_VALUE: return isSetKeyValue(); } @@ -442,6 +490,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_exclWriteEnabled = true && this.isSetExclWriteEnabled(); + boolean that_present_exclWriteEnabled = true && that.isSetExclWriteEnabled(); + if (this_present_exclWriteEnabled || that_present_exclWriteEnabled) { + if (!(this_present_exclWriteEnabled && that_present_exclWriteEnabled)) + return false; + if (this.exclWriteEnabled != that.exclWriteEnabled) + return false; + } + boolean this_present_keyValue = true && this.isSetKeyValue(); boolean that_present_keyValue = true && that.isSetKeyValue(); if (this_present_keyValue || that_present_keyValue) { @@ -472,6 +529,10 @@ package org.apache.hadoop.hive.metastore.api; if (isSetReplLastIdInfo()) hashCode = hashCode * 8191 + replLastIdInfo.hashCode(); + hashCode = hashCode * 8191 + ((isSetExclWriteEnabled()) ? 131071 : 524287); + if (isSetExclWriteEnabled()) + hashCode = hashCode * 8191 + ((exclWriteEnabled) ? 131071 : 524287); + hashCode = hashCode * 8191 + ((isSetKeyValue()) ? 131071 : 524287); if (isSetKeyValue()) hashCode = hashCode * 8191 + keyValue.hashCode(); @@ -527,6 +588,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.valueOf(isSetExclWriteEnabled()).compareTo(other.isSetExclWriteEnabled()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetExclWriteEnabled()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.exclWriteEnabled, other.exclWriteEnabled); + if (lastComparison != 0) { + return lastComparison; + } + } lastComparison = java.lang.Boolean.valueOf(isSetKeyValue()).compareTo(other.isSetKeyValue()); if (lastComparison != 0) { return lastComparison; @@ -591,6 +662,12 @@ package org.apache.hadoop.hive.metastore.api; } first = false; } + if (isSetExclWriteEnabled()) { + if (!first) sb.append(", "); + sb.append("exclWriteEnabled:"); + sb.append(this.exclWriteEnabled); + first = false; + } if (isSetKeyValue()) { if (!first) sb.append(", "); sb.append("keyValue:"); @@ -700,7 +777,15 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 5: // KEY_VALUE + case 5: // EXCL_WRITE_ENABLED + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.exclWriteEnabled = iprot.readBool(); + struct.setExclWriteEnabledIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 6: // KEY_VALUE if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { struct.keyValue = new CommitTxnKeyValue(); struct.keyValue.read(iprot); @@ -753,6 +838,11 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeFieldEnd(); } } + if (struct.isSetExclWriteEnabled()) { + oprot.writeFieldBegin(EXCL_WRITE_ENABLED_FIELD_DESC); + oprot.writeBool(struct.exclWriteEnabled); + oprot.writeFieldEnd(); + } if (struct.keyValue != null) { if (struct.isSetKeyValue()) { oprot.writeFieldBegin(KEY_VALUE_FIELD_DESC); @@ -788,10 +878,13 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetReplLastIdInfo()) { optionals.set(2); } - if (struct.isSetKeyValue()) { + if (struct.isSetExclWriteEnabled()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetKeyValue()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetReplPolicy()) { oprot.writeString(struct.replPolicy); } @@ -807,6 +900,9 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetReplLastIdInfo()) { struct.replLastIdInfo.write(oprot); } + if (struct.isSetExclWriteEnabled()) { + oprot.writeBool(struct.exclWriteEnabled); + } if (struct.isSetKeyValue()) { struct.keyValue.write(oprot); } @@ -817,7 +913,7 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; struct.txnid = iprot.readI64(); struct.setTxnidIsSet(true); - java.util.BitSet incoming = iprot.readBitSet(4); + java.util.BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.replPolicy = iprot.readString(); struct.setReplPolicyIsSet(true); @@ -842,6 +938,10 @@ package org.apache.hadoop.hive.metastore.api; struct.setReplLastIdInfoIsSet(true); } if (incoming.get(3)) { + struct.exclWriteEnabled = iprot.readBool(); + struct.setExclWriteEnabledIsSet(true); + } + if (incoming.get(4)) { struct.keyValue = new CommitTxnKeyValue(); struct.keyValue.read(iprot); struct.setKeyValueIsSet(true); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php index 94cdc6a..287376f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CommitTxnRequest.php @@ -48,6 +48,11 @@ class CommitTxnRequest 'class' => '\metastore\ReplLastIdInfo', ), 5 => array( + 'var' => 'exclWriteEnabled', + 'isRequired' => false, + 'type' => TType::BOOL, + ), + 6 => array( 'var' => 'keyValue', 'isRequired' => false, 'type' => TType::STRUCT, @@ -72,6 +77,10 @@ class CommitTxnRequest */ public $replLastIdInfo = null; /** + * @var bool + */ + public $exclWriteEnabled = true; + /** * @var \metastore\CommitTxnKeyValue */ public $keyValue = null; @@ -91,6 +100,9 @@ class CommitTxnRequest if (isset($vals['replLastIdInfo'])) { $this->replLastIdInfo = $vals['replLastIdInfo']; } + if (isset($vals['exclWriteEnabled'])) { + $this->exclWriteEnabled = $vals['exclWriteEnabled']; + } if (isset($vals['keyValue'])) { $this->keyValue = $vals['keyValue']; } @@ -156,6 +168,13 @@ class CommitTxnRequest } break; case 5: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->exclWriteEnabled); + } else { + $xfer += $input->skip($ftype); + } + break; + case 6: if ($ftype == TType::STRUCT) { $this->keyValue = new \metastore\CommitTxnKeyValue(); $xfer += $this->keyValue->read($input); @@ -207,11 +226,16 @@ class CommitTxnRequest $xfer += $this->replLastIdInfo->write($output); $xfer += $output->writeFieldEnd(); } + if ($this->exclWriteEnabled !== null) { + $xfer += $output->writeFieldBegin('exclWriteEnabled', TType::BOOL, 5); + $xfer += $output->writeBool($this->exclWriteEnabled); + $xfer += $output->writeFieldEnd(); + } if ($this->keyValue !== null) { if (!is_object($this->keyValue)) { throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); } - $xfer += $output->writeFieldBegin('keyValue', TType::STRUCT, 5); + $xfer += $output->writeFieldBegin('keyValue', TType::STRUCT, 6); $xfer += $this->keyValue->write($output); $xfer += $output->writeFieldEnd(); } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 52e88cd..ee8ce4b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -11869,16 +11869,18 @@ class CommitTxnRequest(object): - replPolicy - writeEventInfos - replLastIdInfo + - exclWriteEnabled - keyValue """ - def __init__(self, txnid=None, replPolicy=None, writeEventInfos=None, replLastIdInfo=None, keyValue=None,): + def __init__(self, txnid=None, replPolicy=None, writeEventInfos=None, replLastIdInfo=None, exclWriteEnabled=True, keyValue=None,): self.txnid = txnid self.replPolicy = replPolicy self.writeEventInfos = writeEventInfos self.replLastIdInfo = replLastIdInfo + self.exclWriteEnabled = exclWriteEnabled self.keyValue = keyValue def read(self, iprot): @@ -11918,6 +11920,11 @@ class CommitTxnRequest(object): else: iprot.skip(ftype) elif fid == 5: + if ftype == TType.BOOL: + self.exclWriteEnabled = iprot.readBool() + else: + iprot.skip(ftype) + elif fid == 6: if ftype == TType.STRUCT: self.keyValue = CommitTxnKeyValue() self.keyValue.read(iprot) @@ -11952,8 +11959,12 @@ class CommitTxnRequest(object): oprot.writeFieldBegin('replLastIdInfo', TType.STRUCT, 4) self.replLastIdInfo.write(oprot) oprot.writeFieldEnd() + if self.exclWriteEnabled is not None: + oprot.writeFieldBegin('exclWriteEnabled', TType.BOOL, 5) + oprot.writeBool(self.exclWriteEnabled) + oprot.writeFieldEnd() if self.keyValue is not None: - oprot.writeFieldBegin('keyValue', TType.STRUCT, 5) + oprot.writeFieldBegin('keyValue', TType.STRUCT, 6) self.keyValue.write(oprot) oprot.writeFieldEnd() oprot.writeFieldStop() @@ -27169,7 +27180,8 @@ CommitTxnRequest.thrift_spec = ( (2, TType.STRING, 'replPolicy', 'UTF8', None, ), # 2 (3, TType.LIST, 'writeEventInfos', (TType.STRUCT, [WriteEventInfo, None], False), None, ), # 3 (4, TType.STRUCT, 'replLastIdInfo', [ReplLastIdInfo, None], None, ), # 4 - (5, TType.STRUCT, 'keyValue', [CommitTxnKeyValue, None], None, ), # 5 + (5, TType.BOOL, 'exclWriteEnabled', None, True, ), # 5 + (6, TType.STRUCT, 'keyValue', [CommitTxnKeyValue, None], None, ), # 6 ) all_structs.append(ReplTblWriteIdStateRequest) ReplTblWriteIdStateRequest.thrift_spec = ( diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index cddea02..3b7a0d6 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3560,13 +3560,15 @@ class CommitTxnRequest REPLPOLICY = 2 WRITEEVENTINFOS = 3 REPLLASTIDINFO = 4 - KEYVALUE = 5 + EXCLWRITEENABLED = 5 + KEYVALUE = 6 FIELDS = { TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid'}, REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', :optional => true}, WRITEEVENTINFOS => {:type => ::Thrift::Types::LIST, :name => 'writeEventInfos', :element => {:type => ::Thrift::Types::STRUCT, :class => ::WriteEventInfo}, :optional => true}, REPLLASTIDINFO => {:type => ::Thrift::Types::STRUCT, :name => 'replLastIdInfo', :class => ::ReplLastIdInfo, :optional => true}, + EXCLWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 'exclWriteEnabled', :default => true, :optional => true}, KEYVALUE => {:type => ::Thrift::Types::STRUCT, :name => 'keyValue', :class => ::CommitTxnKeyValue, :optional => true} } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 95fcee0..3c3385f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -3558,7 +3558,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override - public void replCommitTxn(CommitTxnRequest rqst) + public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException { client.commit_txn(rqst); } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 8e0e664..36c5ca3 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -3119,7 +3119,7 @@ public interface IMetaStoreClient { * aborted. This can result from the transaction timing out. * @throws TException */ - void replCommitTxn(CommitTxnRequest rqst) + void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException; /** diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 6cee06a..a6f153a 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1012,8 +1012,9 @@ struct CommitTxnRequest { 3: optional list<WriteEventInfo> writeEventInfos, // Information to update the last repl id of table/partition along with commit txn (replication from 2.6 to 3.0) 4: optional ReplLastIdInfo replLastIdInfo, + 5: optional bool exclWriteEnabled = true, // An optional key/value to store atomically with the transaction - 5: optional CommitTxnKeyValue keyValue, + 6: optional CommitTxnKeyValue keyValue, } struct ReplTblWriteIdStateRequest { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d93e24b..fef2a13 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -913,8 +913,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { txnid = targetTxnIds.get(0); } - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN); - if (txnRecord == null) { + Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid); + if (!txnType.isPresent()) { TxnStatus status = findTxnState(txnid, stmt); if (status == TxnStatus.ABORTED) { if (rqst.isSetReplPolicy()) { @@ -936,7 +936,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator); + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnType.get()), dbConn, sqlGenerator); } LOG.debug("Going to commit"); @@ -1244,8 +1244,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN); - if (txnRecord == null) { + Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid); + if (!txnType.isPresent()) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); if (actualTxnStatus == TxnStatus.COMMITTED) { @@ -1267,7 +1267,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { OperationType.UPDATE + "," + OperationType.DELETE + ")"; long tempCommitId = generateTemporaryId(); - if (txnRecord.type != TxnType.READ_ONLY + if (txnType.get() != TxnType.READ_ONLY && !rqst.isSetReplPolicy() && isUpdateOrDelete(stmt, conflictSQLSuffix)) { @@ -1298,35 +1298,38 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ acquireTxnLock(stmt, false); commitId = getHighWaterMark(stmt); - /** - * see if there are any overlapping txns that wrote the same element, i.e. have a conflict - * Since entire commit operation is mutexed wrt other start/commit ops, - * committed.ws_commit_id <= current.ws_commit_id for all txns - * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap - * For example, [17,20] is committed, [6,80] is being committed right now - these overlap - * [17,20] committed and [21,21] committing now - these do not overlap. - * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) - */ - try (ResultSet rs = checkForWriteConflict(stmt, txnid)) { - if (rs.next()) { - //found a conflict, so let's abort the txn - String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; - StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); - String partitionName = rs.getString(5); - if (partitionName != null) { - resource.append('/').append(partitionName); - } - String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + commitId + "]" + " due to a write conflict on " + resource + - " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); - //remove WRITE_SET info for current txn since it's about to abort - dbConn.rollback(undoWriteSetForCurrentTxn); - LOG.info(msg); - //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this - if (abortTxns(dbConn, Collections.singletonList(txnid), false) != 1) { - throw new IllegalStateException(msg + " FAILED!"); + + if (!rqst.isExclWriteEnabled()) { + /** + * see if there are any overlapping txns that wrote the same element, i.e. have a conflict + * Since entire commit operation is mutexed wrt other start/commit ops, + * committed.ws_commit_id <= current.ws_commit_id for all txns + * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap + * For example, [17,20] is committed, [6,80] is being committed right now - these overlap + * [17,20] committed and [21,21] committing now - these do not overlap. + * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + */ + try (ResultSet rs = checkForWriteConflict(stmt, txnid)) { + if (rs.next()) { + //found a conflict, so let's abort the txn + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if (partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + commitId + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if (abortTxns(dbConn, Collections.singletonList(txnid), false) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + throw new TxnAbortedException(msg); } - dbConn.commit(); - throw new TxnAbortedException(msg); } } } else { @@ -1343,7 +1346,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { assert true; } - if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) { + if (txnType.get() != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) { moveTxnComponentsToCompleted(stmt, txnid, isUpdateDelete); } else if (rqst.isSetReplPolicy()) { if (rqst.isSetWriteEventInfos()) { @@ -1371,14 +1374,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId); + updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType.get(), commitId, tempCommitId); if (rqst.isSetKeyValue()) { updateKeyValueAssociatedWithTxn(rqst, stmt); } if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator); + EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnType.get()), dbConn, sqlGenerator); } LOG.debug("Going to commit"); @@ -2515,25 +2518,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * * SELECT ... FOR UPDATE locks the row until the transaction commits or rolls back. * Second connection using `SELECT ... FOR UPDATE` will suspend until the lock is released. - * @param txnState the state this txn is expected to be in. may be null - * @return null if no row was found + * @return the txnType wrapped in an {@link Optional} * @throws SQLException * @throws MetaException */ - private TxnRecord lockTransactionRecord(Statement stmt, long txnId, TxnStatus txnState) - throws SQLException, MetaException { + private Optional<TxnType> getOpenTxnTypeAndLock(Statement stmt, long txnId) throws SQLException, MetaException { String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnId - + (txnState != null ? " AND \"TXN_STATE\" = " + txnState : ""); + + " AND \"TXN_STATE\" = " + TxnStatus.OPEN; try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { - return rs.next() ? new TxnRecord(rs.getInt(1)) : null; - } - } - - private static final class TxnRecord { - private final TxnType type; - - private TxnRecord(int txnType) { - this.type = TxnType.findByValue(txnType); + return rs.next() ? Optional.ofNullable( + TxnType.findByValue(rs.getInt(1))) : Optional.empty(); } } @@ -2559,8 +2553,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN); - if (txnRecord == null) { + Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, txnid); + if (!txnType.isPresent()) { ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); } @@ -3472,8 +3466,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TxnStatus.OPEN); - if (txnRecord == null) { + Optional<TxnType> txnType = getOpenTxnTypeAndLock(stmt, rqst.getTxnid()); + if (!txnType.isPresent()) { //ensures txn is still there and in expected state ensureValidTxn(dbConn, rqst.getTxnid(), stmt); shouldNeverHappen(rqst.getTxnid()); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index de41fd2..0fe3aba 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -2429,7 +2429,7 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos } @Override - public void replCommitTxn(CommitTxnRequest rqst) + public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException { client.commit_txn(rqst); }