http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 0000000,3785f89..4ef6786 mode 000000,100644..100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@@ -1,0 -1,4906 +1,4949 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.metastore.txn; + + import java.io.PrintWriter; + import java.nio.ByteBuffer; + import java.sql.Connection; + import java.sql.Driver; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.SQLFeatureNotSupportedException; + import java.sql.Savepoint; + import java.sql.Statement; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.BitSet; + import java.util.Calendar; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Properties; + import java.util.Set; + import java.util.SortedSet; + import java.util.TimeZone; + import java.util.TreeSet; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.Semaphore; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.locks.ReentrantLock; + import java.util.regex.Pattern; + + import javax.sql.DataSource; + + import org.apache.commons.lang.ArrayUtils; + import org.apache.commons.lang.NotImplementedException; + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.classification.InterfaceStability; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hive.common.ValidReadTxnList; + import org.apache.hadoop.hive.common.ValidReaderWriteIdList; + import org.apache.hadoop.hive.common.ValidTxnList; + import org.apache.hadoop.hive.common.ValidWriteIdList; + import org.apache.hadoop.hive.common.classification.RetrySemantics; + import org.apache.hadoop.hive.metastore.DatabaseProduct; + import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache; + import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler; + import org.apache.hadoop.hive.metastore.Warehouse; + import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; + import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; + import org.apache.hadoop.hive.metastore.api.*; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; + import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; + import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; + import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; + import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; + import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; + import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; + import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; + import org.apache.hadoop.hive.metastore.messaging.EventMessage; + import org.apache.hadoop.hive.metastore.metrics.Metrics; + import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; + import org.apache.hadoop.hive.metastore.tools.SQLGenerator; + import org.apache.hadoop.hive.metastore.utils.JavaUtils; + import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; + import org.apache.hadoop.hive.metastore.utils.StringableMap; + import org.apache.hadoop.util.StringUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; ++ + import com.google.common.annotations.VisibleForTesting; + + /** + * A handler to answer transaction related calls that come into the metastore + * server. + * + * Note on log messages: Please include txnid:X and lockid info using + * {@link JavaUtils#txnIdToString(long)} + * and {@link JavaUtils#lockIdToString(long)} in all messages. + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, + * so keeping the format consistent makes grep'ing the logs much easier. + * + * Note on HIVE_LOCKS.hl_last_heartbeat. + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding + * transaction in TXNS. + * + * In general there can be multiple metastores where this logic can execute, thus the DB is + * used to ensure proper mutexing of operations. + * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is + * used to properly sequence operations. Most notably: + * 1. various sequence IDs are generated with aid of this mutex + * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state + * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all + * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row. + * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks. + * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock + * can be granted, no other (strictly speaking "earlier") lock can change state. + * + * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded + * (this is the only supported configuration for Derby) + * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations. + * + * {@link #derbyLock} + + * If we ever decide to run remote Derby server, according to + * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be + * seriazlied, so that would also work though has not been tested. + * + * General design note: + * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is + * still valid and active. In the code this is usually achieved at the same time the txn record + * is locked for some operation. + * + * Note on retry logic: + * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}. The retry logic there is very + * generic and is not aware whether the operations are idempotent or not. (This is separate from + * retry logic here in TxnHander which can/does retry DB errors intelligently). The worst case is + * when an op here issues a successful commit against the RDBMS but the calling stack doesn't + * receive the ack and retries. (If an op fails before commit, it's trivially idempotent) + * Thus the ops here need to be made idempotent as much as possible or + * the metstore call stack should have logic not to retry. There are {@link RetrySemantics} + * annotations to document the behavior. + */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { + + static final protected char INITIATED_STATE = 'i'; + static final protected char WORKING_STATE = 'w'; + static final protected char READY_FOR_CLEANING = 'r'; + static final char FAILED_STATE = 'f'; + static final char SUCCEEDED_STATE = 's'; + static final char ATTEMPTED_STATE = 'a'; + + // Compactor types + static final protected char MAJOR_TYPE = 'a'; + static final protected char MINOR_TYPE = 'i'; + + // Transaction states + static final protected char TXN_ABORTED = 'a'; + static final protected char TXN_OPEN = 'o'; + //todo: make these like OperationType and remove above char constatns + enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} + + public enum TxnType { + DEFAULT(0), REPL_CREATED(1), READ_ONLY(2); + + private final int value; + TxnType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } + + // Lock states + static final protected char LOCK_ACQUIRED = 'a'; + static final protected char LOCK_WAITING = 'w'; + + // Lock types + static final protected char LOCK_EXCLUSIVE = 'e'; + static final protected char LOCK_SHARED = 'r'; + static final protected char LOCK_SEMI_SHARED = 'w'; + + static final private int ALLOWED_REPEATED_DEADLOCKS = 10; + static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); + + static private DataSource connPool; + private static DataSource connPoolMutex; + static private boolean doRetryOnConnPool = false; + + private List<TransactionalMetaStoreEventListener> transactionalListeners; + + private enum OpertaionType { + SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'); + private final char sqlConst; + OpertaionType(char sqlConst) { + this.sqlConst = sqlConst; + } + public String toString() { + return Character.toString(sqlConst); + } + public static OpertaionType fromString(char sqlConst) { + switch (sqlConst) { + case 's': + return SELECT; + case 'i': + return INSERT; + case 'u': + return UPDATE; + case 'd': + return DELETE; + default: + throw new IllegalArgumentException(quoteChar(sqlConst)); + } + } + public static OpertaionType fromDataOperationType(DataOperationType dop) { + switch (dop) { + case SELECT: + return OpertaionType.SELECT; + case INSERT: + return OpertaionType.INSERT; + case UPDATE: + return OpertaionType.UPDATE; + case DELETE: + return OpertaionType.DELETE; + default: + throw new IllegalArgumentException("Unexpected value: " + dop); + } + } + } + + // Maximum number of open transactions that's allowed + private static volatile int maxOpenTxns = 0; + // Whether number of open transactions reaches the threshold + private static volatile boolean tooManyOpenTxns = false; + + /** + * Number of consecutive deadlocks we have seen + */ + private int deadlockCnt; + private long deadlockRetryInterval; + protected Configuration conf; + private static DatabaseProduct dbProduct; + private static SQLGenerator sqlGenerator; + + // (End user) Transaction timeout, in milliseconds. + private long timeout; + + private String identifierQuoteString; // quotes to use for quoting tables, where necessary + private long retryInterval; + private int retryLimit; + private int retryNum; + // Current number of open txns + private AtomicInteger numOpenTxns; + + /** + * Derby specific concurrency control + */ + private static final ReentrantLock derbyLock = new ReentrantLock(true); + /** + * must be static since even in UT there may be > 1 instance of TxnHandler + * (e.g. via Compactor services) + */ + private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>(); + private static final String hostname = JavaUtils.hostname(); + + // Private methods should never catch SQLException and then throw MetaException. The public + // methods depend on SQLException coming back so they can detect and handle deadlocks. Private + // methods should only throw MetaException when they explicitly know there's a logic error and + // they want to throw past the public methods. + // + // All public methods that write to the database have to check for deadlocks when a SQLException + // comes back and handle it if they see one. This has to be done with the connection pooling + // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, + // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. + + public TxnHandler() { + } + + /** + * This is logically part of c'tor and must be called prior to any other method. + * Not physically part of c'tor due to use of reflection + */ + public void setConf(Configuration conf) { + this.conf = conf; + + checkQFileTestHack(); + + synchronized (TxnHandler.class) { + if (connPool == null) { + Connection dbConn = null; + // Set up the JDBC connection pool + try { + int maxPoolSize = MetastoreConf.getIntVar(conf, ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS); + long getConnectionTimeoutMs = 30000; + connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs); + /*the mutex pools should ideally be somewhat larger since some operations require 1 + connection from each pool and we want to avoid taking a connection from primary pool + and then blocking because mutex pool is empty. There is only 1 thread in any HMS trying + to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock. The CheckLock operation gets a + connection from connPool first, then connPoolMutex. All others, go in the opposite + order (not very elegant...). So number of connection requests for connPoolMutex cannot + exceed (size of connPool + MUTEX_KEY.values().length - 1).*/ + connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(dbConn); + sqlGenerator = new SQLGenerator(dbProduct, conf); + } catch (SQLException e) { + String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); + LOG.error(msg); + throw new RuntimeException(e); + } finally { + closeDbConn(dbConn); + } + } + } + + numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS); + + timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS); + buildJumpTable(); + retryInterval = MetastoreConf.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, + TimeUnit.MILLISECONDS); + retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); + deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); + + try { + transactionalListeners = MetaStoreUtils.getMetaStoreListeners( + TransactionalMetaStoreEventListener.class, + conf, MetastoreConf.getVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS)); + } catch(MetaException e) { + String msg = "Unable to get transaction listeners, " + e.getMessage(); + LOG.error(msg); + throw new RuntimeException(e); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + @RetrySemantics.ReadOnly + public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { + try { + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + /** + * This method can run at READ_COMMITTED as long as long as + * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. + * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with + * adding corresponding entries into TXNS. The reason is that any txnid below HWM + * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + + "initialized, null record found in next_txn_id"); + } + close(rs); + List<TxnInfo> txnInfos = new ArrayList<>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED + s = "select txn_id, txn_state, txn_user, txn_host, txn_started, txn_last_heartbeat from " + + "TXNS where txn_id <= " + hwm; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + char c = rs.getString(2).charAt(0); + TxnState state; + switch (c) { + case TXN_ABORTED: + state = TxnState.ABORTED; + break; + + case TXN_OPEN: + state = TxnState.OPEN; + break; + + default: + throw new MetaException("Unexpected transaction state " + c + + " found in txns table"); + } + TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)); + txnInfo.setStartedTime(rs.getLong(5)); + txnInfo.setLastHeartbeatTime(rs.getLong(6)); + txnInfos.add(txnInfo); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + return new GetOpenTxnsInfoResponse(hwm, txnInfos); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxnsInfo"); + throw new MetaException("Unable to select from transaction database: " + getMessage(e) + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return getOpenTxnsInfo(); + } + } ++ + @Override + @RetrySemantics.ReadOnly + public GetOpenTxnsResponse getOpenTxns() throws MetaException { + try { + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + + "initialized, null record found in next_txn_id"); + } + close(rs); + List<Long> openList = new ArrayList<>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED + s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + long minOpenTxn = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); + while (rs.next()) { + long txnId = rs.getLong(1); + openList.add(txnId); + char c = rs.getString(2).charAt(0); + if(c == TXN_OPEN) { + minOpenTxn = Math.min(minOpenTxn, txnId); + } else if (c == TXN_ABORTED) { + abortedBits.set(openList.size() - 1); + } + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); + if(minOpenTxn < Long.MAX_VALUE) { + otr.setMin_open_txn(minOpenTxn); + } + return otr; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxns"); + throw new MetaException("Unable to select from transaction database, " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return getOpenTxns(); + } + } + + /** + * Retry-by-caller note: + * Worst case, it will leave an open txn which will timeout. + */ + @Override + @RetrySemantics.Idempotent + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (!tooManyOpenTxns && numOpenTxns.get() >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns.get() < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " + + "reached. Current number of open transactions: " + numOpenTxns); + throw new MetaException("Maximum allowed number of open transactions has been reached. " + + "See hive.max.open.txns."); + } + } + + int numTxns = rqst.getNum_txns(); + if (numTxns <= 0) { + throw new MetaException("Invalid input for number of txns: " + numTxns); + } + + try { + Connection dbConn = null; + Statement stmt = null; + try { + lockInternal(); + /** + * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure + * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. + * Also, advancing the counter must work when multiple metastores are running. + * SELECT ... FOR UPDATE is used to prevent + * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * + * In the current design, there can be several metastore instances running in a given Warehouse. + * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, + * a client may go to MS1 and start a transaction with ID 500 to update a particular row. + * Now the same client will start another transaction, except it ends up on MS2 and may get + * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot + * on read will thing the version of the row from transaction ID 500 is the latest one. + * + * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This + * set could support a write-through cache for added performance. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + // Make sure the user has not requested an insane amount of txns. + int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH); + if (numTxns > maxTxns) numTxns = maxTxns; + + stmt = dbConn.createStatement(); + List<Long> txnIds = openTxns(dbConn, stmt, rqst); + + LOG.debug("Going to commit"); + dbConn.commit(); + return new OpenTxnsResponse(txnIds); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return openTxns(rqst); + } + } + + private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) + throws SQLException, MetaException { + int numTxns = rqst.getNum_txns(); + ResultSet rs = null; + TxnType txnType = TxnType.DEFAULT; + try { + if (rqst.isSetReplPolicy()) { + List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt); + + if (!targetTxnIdList.isEmpty()) { + if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) { + LOG.warn("target txn id number " + targetTxnIdList.toString() + + " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString()); + } + LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" + + rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString()); + return targetTxnIdList; + } + txnType = TxnType.REPL_CREATED; + } + + String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + long first = rs.getLong(1); + s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + long now = getDbTime(dbConn); + List<Long> txnIds = new ArrayList<>(numTxns); + + List<String> rows = new ArrayList<>(); + for (long i = first; i < first + numTxns; i++) { + txnIds.add(i); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue()); + } + List<String> queries = sqlGenerator.createInsertValuesStmt( + "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows); + for (String q : queries) { + LOG.debug("Going to execute update <" + q + ">"); + stmt.execute(q); + } + + // Need to register minimum open txnid for current transactions into MIN_HISTORY table. + s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + + // TXNS table should have atleast one entry because we just inserted the newly opened txns. + // So, min(txn_id) would be a non-zero txnid. + long minOpenTxnId = rs.getLong(1); + assert (minOpenTxnId > 0); + rows.clear(); + for (long txnId = first; txnId < first + numTxns; txnId++) { + rows.add(txnId + ", " + minOpenTxnId); + } + + // Insert transaction entries into MIN_HISTORY_LEVEL. + List<String> inserts = sqlGenerator.createInsertValuesStmt( + "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + + ") with min_open_txn: " + minOpenTxnId); + + if (rqst.isSetReplPolicy()) { + List<String> rowsRepl = new ArrayList<>(); + + for (int i = 0; i < numTxns; i++) { + rowsRepl.add( + quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + } + + List<String> queriesRepl = sqlGenerator.createInsertValuesStmt( + "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl); + + for (String query : queriesRepl) { + LOG.info("Going to execute insert <" + query + ">"); + stmt.execute(query); + } + } + + if (transactionalListeners != null) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); + } + return txnIds; + } finally { + close(rs); + } + } + + private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Statement stmt) + throws SQLException { + ResultSet rs = null; + try { + List<String> inQueries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + List<Long> targetTxnIdList = new ArrayList<>(); + prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where "); + suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy)); + TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList, + "RTM_SRC_TXN_ID", false, false); + for (String query : inQueries) { + LOG.debug("Going to execute select <" + query + ">"); + rs = stmt.executeQuery(query); + while (rs.next()) { + targetTxnIdList.add(rs.getLong(1)); + } + } + LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString()); + return targetTxnIdList; + } catch (SQLException e) { + LOG.warn("failed to get target txn ids " + e.getMessage()); + throw e; + } finally { + close(rs); + } + } + + @Override + @RetrySemantics.Idempotent + public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt); + if (targetTxnIds.isEmpty()) { + LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy); + return -1; + } + assert (targetTxnIds.size() == 1); + return targetTxnIds.get(0); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")"); + throw new MetaException("Unable to get target transaction id " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return getTargetTxnId(replPolicy, sourceTxnId); + } + } + + @Override + @RetrySemantics.Idempotent + public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException { + long txnid = rqst.getTxnid(); + long sourceTxnId = -1; + try { + Connection dbConn = null; + Statement stmt = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + if (rqst.isSetReplPolicy()) { + sourceTxnId = rqst.getTxnid(); + List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(), + Collections.singletonList(sourceTxnId), stmt); + if (targetTxnIds.isEmpty()) { + LOG.info("Target txn id is missing for source txn id : " + sourceTxnId + + " and repl policy " + rqst.getReplPolicy()); + return; + } + assert targetTxnIds.size() == 1; + txnid = targetTxnIds.get(0); + } + + if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + TxnStatus status = findTxnState(txnid,stmt); + if(status == TxnStatus.ABORTED) { + if (rqst.isSetReplPolicy()) { + // in case of replication, idempotent is taken care by getTargetTxnId + LOG.warn("Invalid state ABORTED for transactions started using replication replay task"); + String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); + LOG.info("Going to execute <" + s + ">"); + stmt.executeUpdate(s); + } + LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) + + ") requested by it is already " + TxnStatus.ABORTED); + return; + } + raiseTxnUnexpectedState(status, txnid); + } + + if (rqst.isSetReplPolicy()) { + String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); + LOG.info("Going to execute <" + s + ">"); + stmt.executeUpdate(s); + } + + if (transactionalListeners != null) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + abortTxn(rqst); + } + } + + @Override + @RetrySemantics.Idempotent + public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { + List<Long> txnids = rqst.getTxn_ids(); + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + int numAborted = abortTxns(dbConn, txnids, false); + if (numAborted != txnids.size()) { + LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + + txnids.size() + " transactions. It's possible that the other " + + (txnids.size() - numAborted) + + " transactions have been aborted or committed, or the transaction ids are invalid."); + } + + for (Long txnId : txnids) { + if (transactionalListeners != null) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); + } + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxns(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + abortTxns(rqst); + } + } + + /** + * Concurrency/isolation notes: + * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)} + * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X + * see more notes below. + * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn + * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence + * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of + * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. + * + * Motivating example: + * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1 + * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot + * that they read appropriately. In particular, if txns do not overlap, then one follows the other + * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure + * this by locking in snapshot after + * {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn) + * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure + * that txn T which will be considered a later txn, locks in a snapshot that includes the result + * of S's commit (assuming no other txns). + * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions + * were running in parallel). If T and S both locked in the same snapshot (for example commit of + * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) + * 'x' would be updated to the same value by both, i.e. lost update. + */ + @Override + @RetrySemantics.Idempotent("No-op if already committed") + public void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + MaterializationsRebuildLockHandler materializationsRebuildLockHandler = + MaterializationsRebuildLockHandler.get(); + List<TransactionRegistryInfo> txnComponents = new ArrayList<>(); + boolean isUpdateDelete = false; + long txnid = rqst.getTxnid(); + long sourceTxnId = -1; + + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet lockHandle = null; + ResultSet commitIdRs = null, rs; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + if (rqst.isSetReplPolicy()) { + sourceTxnId = rqst.getTxnid(); + List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(), + Collections.singletonList(sourceTxnId), stmt); + if (targetTxnIds.isEmpty()) { + LOG.info("Target txn id is missing for source txn id : " + sourceTxnId + + " and repl policy " + rqst.getReplPolicy()); + return; + } + assert targetTxnIds.size() == 1; + txnid = targetTxnIds.get(0); + } + + /** + * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other + * operation can change this txn (such acquiring locks). While lock() and commitTxn() + * should not normally run concurrently (for same txn) but could due to bugs in the client + * which could then corrupt internal transaction manager state. Also competes with abortTxn(). + */ + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (lockHandle == null) { + //if here, txn was not found (in expected state) + TxnStatus actualTxnStatus = findTxnState(txnid, stmt); + if(actualTxnStatus == TxnStatus.COMMITTED) { + if (rqst.isSetReplPolicy()) { + // in case of replication, idempotent is taken care by getTargetTxnId + LOG.warn("Invalid state COMMITTED for transactions started using replication replay task"); + } + /** + * This makes the operation idempotent + * (assume that this is most likely due to retry logic) + */ + LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") msg"); + return; + } + raiseTxnUnexpectedState(actualTxnStatus, txnid); + shouldNeverHappen(txnid); + //dbConn is rolled back in finally{} + } + + String conflictSQLSuffix = null; + if (rqst.isSetReplPolicy()) { + rs = null; + } else { + conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; + rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, + "tc_operation_type " + conflictSQLSuffix)); + } + if (rs != null && rs.next()) { + isUpdateDelete = true; + close(rs); + //if here it means currently committing txn performed update/delete and we should check WW conflict + /** + * This S4U will mutex with other commitTxn() and openTxns(). + * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial + * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start + * at the same time and no new txns start until all 3 commit. + * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. + */ + commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID")); + if (!commitIdRs.next()) { + throw new IllegalStateException("No rows found in NEXT_TXN_ID"); + } + long commitId = commitIdRs.getLong(1); + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + /** + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of it's columns + */ + int numCompsWritten = stmt.executeUpdate( + "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); + /** + * see if there are any overlapping txns wrote the same element, i.e. have a conflict + * Since entire commit operation is mutexed wrt other start/commit ops, + * committed.ws_commit_id <= current.ws_commit_id for all txns + * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap + * For example, [17,20] is committed, [6,80] is being committed right now - these overlap + * [17,20] committed and [21,21] committing now - these do not overlap. + * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + */ + rs = stmt.executeQuery + (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," + + "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " + + "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " + + "from WRITE_SET committed INNER JOIN WRITE_SET cur " + + "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " + + //For partitioned table we always track writes at partition level (never at table) + //and for non partitioned - always at table level, thus the same table should never + //have entries with partition key and w/o + "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " + + "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid + // with txnid, though any decent DB should infer this + " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as + // part of this commitTxn() op + " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns + //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all + " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + + " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")")); + if (rs.next()) { + //found a conflict + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if (partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); + close(rs); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + close(null, stmt, dbConn); + throw new TxnAbortedException(msg); + } else { + //no conflicting operations, proceed with the rest of commit sequence + } + } + else { + /** + * current txn didn't update/delete anything (may have inserted), so just proceed with commit + * + * We only care about commit id for write txns, so for RO (when supported) txns we don't + * have to mutex on NEXT_TXN_ID. + * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's + * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn. + * If RO < W, then there is no reads-from relationship. + * In replication flow we don't expect any write write conflict as it should have been handled at source. + */ + } + + String s; + if (!rqst.isSetReplPolicy()) { + // Move the record from txn_components into completed_txn_components so that the compactor + // knows where to look to compact. + s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + + "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " + + "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute insert <" + s + ">"); + + if ((stmt.executeUpdate(s)) < 1) { + //this can be reasonable for an empty txn START/COMMIT or read-only txn + //also an IUD with DP that didn't match any rows. + LOG.info("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); + } + } else { + if (rqst.isSetWriteEventInfos()) { + List<String> rows = new ArrayList<>(); + for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { + rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," + + quoteString(writeEventInfo.getTable()) + "," + + quoteString(writeEventInfo.getPartition()) + "," + + writeEventInfo.getWriteId()); + } + List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " + + "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid)", rows); + for (String q : queries) { + LOG.debug("Going to execute insert <" + q + "> "); + stmt.execute(q); + } + } + + s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); + LOG.info("Repl going to execute <" + s + ">"); + stmt.executeUpdate(s); + } + + // Obtain information that we need to update registry + s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS" + + " where ctc_txnid = " + txnid; + + LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + // We only enter in this loop if the transaction actually affected any table + txnComponents.add(new TransactionRegistryInfo(rs.getString(1), rs.getString(2), + rs.getLong(3), rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime())); + } + + // cleanup all txn related metadata + s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + s = "delete from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); + if (transactionalListeners != null) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); + } + + MaterializationsInvalidationCache materializationsInvalidationCache = + MaterializationsInvalidationCache.get(); + for (TransactionRegistryInfo info : txnComponents) { + if (materializationsInvalidationCache.containsMaterialization(info.dbName, info.tblName) && + !materializationsRebuildLockHandler.readyToCommitResource(info.dbName, info.tblName, txnid)) { + throw new MetaException( + "Another process is rebuilding the materialized view " + info.fullyQualifiedName); + } + } + LOG.debug("Going to commit"); + close(rs); + dbConn.commit(); + + // Update registry with modifications + for (TransactionRegistryInfo info : txnComponents) { + materializationsInvalidationCache.notifyTableModification( + info.dbName, info.tblName, info.writeId, info.timestamp, isUpdateDelete); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "commitTxn(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(commitIdRs); + close(lockHandle, stmt, dbConn); + unlockInternal(); + for (TransactionRegistryInfo info : txnComponents) { + materializationsRebuildLockHandler.unlockResource(info.dbName, info.tblName, txnid); + } + } + } catch (RetryException e) { + commitTxn(rqst); + } + } + + /** + * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. + * @param rqst info on table/partitions and writeid snapshot to replicate. + * @throws MetaException + */ + @Override + @RetrySemantics.Idempotent("No-op if already replicated the writeid state") + public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException { + String dbName = rqst.getDbName().toLowerCase(); + String tblName = rqst.getTableName().toLowerCase(); + ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(rqst.getValidWriteIdlist()); + + // Get the abortedWriteIds which are already sorted in ascending order. + List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList); + int numAbortedWrites = abortedWriteIds.size(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + TxnStore.MutexAPI.LockHandle handle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // Check if this txn state is already replicated for this given table. If yes, then it is + // idempotent case and just return. + String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName); + LOG.debug("Going to execute query <" + sql + ">"); + + rs = stmt.executeQuery(sql); + if (rs.next()) { + LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: " + + dbName + "." + tblName); + rollbackDBConn(dbConn); + return; + } + + if (numAbortedWrites > 0) { + // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted. + List<Long> txnIds = openTxns(dbConn, stmt, + new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName())); + assert(numAbortedWrites == txnIds.size()); + + // Map each aborted write id with each allocated txn. + List<String> rows = new ArrayList<>(); + int i = 0; + for (long txn : txnIds) { + long writeId = abortedWriteIds.get(i++); + rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); + } + + // Insert entries to TXN_TO_WRITE_ID for aborted write ids + List<String> inserts = sqlGenerator.createInsertValuesStmt( + "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + + // Abort all the allocated txns so that the mapped write ids are referred as aborted ones. + int numAborts = abortTxns(dbConn, txnIds, true); + assert(numAborts == numAbortedWrites); + } + handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); + + // There are some txns in the list which has no write id allocated and hence go ahead and do it. + // Get the next write id for the given table and update it with new next write id. + // It is expected NEXT_WRITE_ID doesn't have entry for this table and hence directly insert it. + long nextWriteId = validWriteIdList.getHighWatermark() + 1; + + // First allocation of write id (hwm+1) should add the table to the next_write_id meta table. + sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" + + quoteString(dbName) + "," + quoteString(tblName) + "," + + Long.toString(nextWriteId) + ")"; + LOG.debug("Going to execute insert <" + sql + ">"); + stmt.execute(sql); + + LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + replTableWriteIdState(rqst); + } + + // Schedule Major compaction on all the partitions/table to clean aborted data + if (numAbortedWrites > 0) { + CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(), + CompactionType.MAJOR); + if (rqst.isSetPartNames()) { + for (String partName : rqst.getPartNames()) { + compactRqst.setPartitionname(partName); + compact(compactRqst); + } + } else { + compact(compactRqst); + } + } + } + + private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) { + List<Long> abortedWriteIds = new ArrayList<>(); + for (long writeId : validWriteIdList.getInvalidWriteIds()) { + if (validWriteIdList.isWriteIdAborted(writeId)) { + abortedWriteIds.add(writeId); + } + } + return abortedWriteIds; + } + + @Override + @RetrySemantics.ReadOnly + public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) + throws NoSuchTxnException, MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + ValidTxnList validTxnList; + + // We should prepare the valid write ids list based on validTxnList of current txn. + // If no txn exists in the caller, then they would pass null for validTxnList and so it is + // required to get the current state of txns to make validTxnList + if (rqst.isSetValidTxnList()) { + validTxnList = new ValidReadTxnList(rqst.getValidTxnList()); + } else { + // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn + validTxnList = TxnUtils.createValidReadTxnList(getOpenTxns(), 0); + } + try { + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // Get the valid write id list for all the tables read by the current txn + List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>(); + for (String fullTableName : rqst.getFullTableNames()) { + tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList)); + } + + LOG.debug("Going to rollback"); + dbConn.rollback(); + GetValidWriteIdsResponse owr = new GetValidWriteIdsResponse(tblValidWriteIdsList); + return owr; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getValidWriteIds"); + throw new MetaException("Unable to select from transaction database, " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException e) { + return getValidWriteIds(rqst); + } + } + + // Method to get the Valid write ids list for the given table + // Input fullTableName is expected to be of format <db_name>.<table_name> + private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName, + ValidTxnList validTxnList) throws SQLException { + ResultSet rs = null; + String[] names = TxnUtils.getDbTableName(fullTableName); + try { + // Need to initialize to 0 to make sure if nobody modified this table, then current txn + // shouldn't read any data. + // If there is a conversion from non-acid to acid table, then by default 0 would be assigned as + // writeId for data from non-acid table and so writeIdHwm=0 would ensure those data are readable by any txns. + long writeIdHwm = 0; + List<Long> invalidWriteIdList = new ArrayList<>(); + long minOpenWriteId = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); + long txnHwm = validTxnList.getHighWatermark(); + + // Find the writeId high water mark based upon txnId high water mark. If found, then, need to + // traverse through all write Ids less than writeId HWM to make exceptions list. + // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm)) + String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]); + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + writeIdHwm = rs.getLong(1); + } + + // If no writeIds allocated by txns under txnHwm, then find writeHwm from NEXT_WRITE_ID. + if (writeIdHwm <= 0) { + // Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest + // allocated write id. + s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + quoteString(names[0]) + + " and nwi_table = " + quoteString(names[1]); + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + long maxWriteId = rs.getLong(1); + if (maxWriteId > 0) { + writeIdHwm = (writeIdHwm > 0) ? Math.min(maxWriteId, writeIdHwm) : maxWriteId; + } + } + } + + // As writeIdHwm is known, query all writeIds under the writeId HWM. + // If any writeId under HWM is allocated by txn > txnId HWM or belongs to open/aborted txns, + // then will be added to invalid list. The results should be sorted in ascending order based + // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up + // using binary search. + s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]) + + " order by t2w_writeid asc"; + + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + long txnId = rs.getLong(1); + long writeId = rs.getLong(2); + if (validTxnList.isTxnValid(txnId)) { + // Skip if the transaction under evaluation is already committed. + continue; + } + + // The current txn is either in open or aborted state. + // Mark the write ids state as per the txn state. + invalidWriteIdList.add(writeId); + if (validTxnList.isTxnAborted(txnId)) { + abortedBits.set(invalidWriteIdList.size() - 1); + } else { + minOpenWriteId = Math.min(minOpenWriteId, writeId); + } + } + + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + TableValidWriteIds owi = new TableValidWriteIds(fullTableName, writeIdHwm, invalidWriteIdList, byteBuffer); + if (minOpenWriteId < Long.MAX_VALUE) { + owi.setMinOpenWriteId(minOpenWriteId); + } + return owi; + } finally { + close(rs); + } + } + + @Override + @RetrySemantics.Idempotent + public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + List<Long> txnIds; + String dbName = rqst.getDbName().toLowerCase(); + String tblName = rqst.getTableName().toLowerCase(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + TxnStore.MutexAPI.LockHandle handle = null; + List<TxnToWriteId> txnToWriteIds = new ArrayList<>(); + List<TxnToWriteId> srcTxnToWriteIds = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + if (rqst.isSetReplPolicy()) { + srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList(); + List<Long> srcTxnIds = new ArrayList<>(); + assert (rqst.isSetSrcTxnToWriteIdList()); + assert (!rqst.isSetTxnIds()); + assert (!srcTxnToWriteIds.isEmpty()); + + for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { + srcTxnIds.add(txnToWriteId.getTxnId()); + } + txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt); + if (srcTxnIds.size() != txnIds.size()) { + LOG.warn("Target txn id is missing for source txn id : " + srcTxnIds.toString() + + " and repl policy " + rqst.getReplPolicy()); + throw new RuntimeException("This should never happen for txnIds: " + txnIds); + } + } else { + assert (!rqst.isSetSrcTxnToWriteIdList()); + assert (rqst.isSetTxnIds()); + txnIds = rqst.getTxnIds(); + } + + Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow + + // Check if all the input txns are in open state. Write ID should be allocated only for open transactions. + if (!isTxnsInOpenState(txnIds, stmt)) { + ensureAllTxnsValid(dbName, tblName, txnIds, stmt); + throw new RuntimeException("This should never happen for txnIds: " + txnIds); + } + + long writeId; + String s; + long allocatedTxnsCount = 0; + long txnId; + List<String> queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a + // write id for the same db.table. If yes, then need to reuse it else have to allocate new one + // The write id would have been already allocated in case of multi-statement txns where + // first write on a table will allocate write id and rest of the writes should re-use it. + prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where" + + " t2w_database = " + quoteString(dbName) + + " and t2w_table = " + quoteString(tblName) + " and "); + suffix.append(""); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, + txnIds, "t2w_txnid", false, false); + for (String query : queries) { + LOG.debug("Going to execute query <" + query + ">"); + rs = stmt.executeQuery(query); + while (rs.next()) { + // If table write ID is already allocated for the given transaction, then just use it + txnId = rs.getLong(1); + writeId = rs.getLong(2); + txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); + allocatedTxnsCount++; + LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId); + } + } + + // Batch allocation should always happen atomically. Either write ids for all txns is allocated or none. + long numOfWriteIds = txnIds.size(); + assert ((allocatedTxnsCount == 0) || (numOfWriteIds == allocatedTxnsCount)); + if (allocatedTxnsCount == numOfWriteIds) { + // If all the txns in the list have pre-allocated write ids for the given table, then just return. + // This is for idempotent case. + return new AllocateTableWriteIdsResponse(txnToWriteIds); + } + + handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); + + // There are some txns in the list which does not have write id allocated and hence go ahead and do it. + // Get the next write id for the given table and update it with new next write id. + // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID + s = sqlGenerator.addForUpdateClause( + "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName)); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + // First allocation of write id should add the table to the next_write_id meta table + // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here + writeId = 1; + s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" + + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")"; + LOG.debug("Going to execute insert <" + s + ">"); + stmt.execute(s); + } else { + writeId = rs.getLong(1); + // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated + s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds) + + " where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + + // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated + // write ids + List<String> rows = new ArrayList<>(); + for (long txn : txnIds) { + rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId); + txnToWriteIds.add(new TxnToWriteId(txn, writeId)); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); + writeId++; + } + + if (rqst.isSetReplPolicy()) { + int lastIdx = txnToWriteIds.size()-1; + if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId()) || + (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId())) { + LOG.error("Allocated write id range {} is not matching with the input write id range {}.", + txnToWriteIds, srcTxnToWriteIds); + throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds); + } + } + + // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids + List<String> inserts = sqlGenerator.createInsertValuesStmt( + "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + + if (transactionalListeners != null) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.ALLOC_WRITE_ID, + new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(), null), + dbConn, sqlGenerator); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + return new AllocateTableWriteIdsResponse(txnToWriteIds); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + return allocateTableWriteIds(rqst); + } + } + @Override + public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) + throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + TxnStore.MutexAPI.LockHandle handle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); + //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry + //for this table. It also has a unique index in case 'should not' is violated + + // First allocation of write id should add the table to the next_write_id meta table + // The initial value for write id should be 1 and hence we add 1 with number of write ids + // allocated here + String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" + + quoteString(rqst.getDbName()) + "," + quoteString(rqst.getTblName()) + "," + + Long.toString(rqst.getSeeWriteId() + 1) + ")"; + LOG.debug("Going to execute insert <" + s + ">"); + stmt.execute(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "seedWriteIdOnAcidConversion(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + seedWriteIdOnAcidConversion(rqst); + } + + } + @Override + @RetrySemantics.Idempotent + public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) + throws MetaException { + Connection dbConn = null; + try { + try { + //Idempotent case is handled by notify Event + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, sqlGenerator); + LOG.debug("Going to commit"); + dbConn.commit(); + return; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + if (isDuplicateKeyError(e)) { + // in case of key duplicate error, retry as it might be because of race condition + if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", e.getMessage())) { + throw new RetryException(); + } + retryNum = 0; + throw new MetaException(e.getMessage()); + } + checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")"); + throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e)); + } finally{ + closeDbConn(dbConn); + unlockInternal(); + } + } catch (RetryException e) { + addWriteNotificationLog(acidWriteEvent); + } + } + + @Override + @RetrySemantics.SafeToRetry + public void performWriteSetGC() { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); + if(!rs.next()) { + throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); + } + long highestAllocatedTxnId = rs.getLong(1); + close(rs); + rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN)); + if(!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark + long lowestOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + //if here then there are no Open txns and highestAllocatedTxnId must be + //resolved (i.e. committed or aborted), either way + //there are no open txns with id <= highestAllocatedTxnId + //the +1 is there because "delete ..." below has < (which is correct for the case when + //there is an open txn + //Concurrency: even if new txn starts (or starts + commits) it is still true that + //there are no currently open txns that overlap with any committed txn with + //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. + commitHighWaterMark = highestAllocatedTxnId + 1; + } + else { + commitHighWaterMark = lowestOpenTxnId; + } + int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark); + LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET"); + dbConn.commit(); + } catch (SQLException ex) { + LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); + } + finally { + close(rs, stmt, dbConn); + } + } + + /** + * Gets the information of the first transaction for the given table + * after the transaction with the input id was committed (if any). + */ + @Override + @RetrySemantics.ReadOnly + public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( + String inputDbName, String inputTableName, ValidWriteIdList txnList) + throws MetaException { + final List<Long> openTxns = Arrays.asList(ArrayUtils.toObject(txnList.getInvalidWriteIds())); + + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + stmt.setMaxRows(1); + String s = "select ctc_timestamp, ctc_writeid, ctc_database, ctc_table " + + "from COMPLETED_TXN_COMPONENTS " + + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName) + + " and ctc_writeid > " + txnList.getHighWatermark() + + (txnList.getInvalidWriteIds().length == 0 ? + " " : " or ctc_writeid IN(" + StringUtils.join(",", openTxns) + ") ") + + "order by ctc_timestamp asc"; + if (LOG.isDebugEnabled()) { + LOG.debug("Going to execute query <" + s + ">"); + } + rs = stmt.executeQuery(s); + + if(!rs.next()) { + return new BasicTxnInfo(true); + } + final BasicTxnInfo txnInfo = new BasicTxnInfo(false); + txnInfo.setTime(rs.getTimestamp(1, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); + txnInfo.setTxnid(rs.getLong(2)); + txnInfo.setDbname(rs.getString(3)); + txnInfo.setTablename(rs.getString(4)); + return txnInfo; + } catch (SQLException ex) { + LOG.warn("getLastCompletedTransactionForTable failed due to " + getMessage(ex), ex); + throw new MetaException("Unable to retrieve commits information due to " + StringUtils.stringifyException(ex)); + } finally { + close(rs, stmt, dbConn); + } + } + + /** + * As much as possible (i.e. in absence of retries) we want both operations to be done on the same + * connection (but separate transactions). This avoid some flakiness in BONECP where if you + * perform an operation on 1 connection and immediately get another from the pool, the 2nd one + * doesn't see results of the first. + * + * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case + * there will be a duplicate set of locks but both sets will belong to the same txn so they + * will not conflict with each other. For locks w/o txn context (i.e. read-only query), this + * may lead to deadlock (at least a long wait). (e.g. 1st call creates locks in {@code LOCK_WAITING} + * mode and response gets lost. Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * retries, and enqueues another set of locks in LOCK_WAITING. The 2nd LockResponse is delivered + * to the DbLockManager, which will keep dong {@link #checkLock(CheckLockRequest)} until the 1st + * set of locks times out. + */ + @RetrySemantics.CannotRetry + public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); + try { + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + } + catch(NoSuchLockException e) { + // This should never happen, as we just added the lock id + throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); + } + } + private static final class ConnectionLockIdPair { + private final Connection dbConn; + private final long extLockId; + private ConnectionLockIdPair(Connection dbConn, long extLockId) { + this.dbConn = dbConn; + this.extLockId = extLockId; + } + } + + /** + * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read + * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but + * to the same value as before thus forcing db to acquire write lock for duration of the transaction. + * + * There is no real reason to return the ResultSet here other than to make sure the reference to it + * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock + * to be released. + * @param txnState the state this txn is expected to be in. may be null + * @return null if no row was found + * @throws SQLException + * @throws MetaException + */ + private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); + ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query)); + if(rs.next()) { + return rs; + } + close(rs); + return null; + } + + /** + * This enters locks into the queue in {@link #LOCK_WAITING} mode. + * + * Isolation Level Notes: + * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes + * any 2 {@code enqueueLockWithRetry()} calls. + * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations + * @see #checkLockWithRetry(Connection, long, long) + */ + private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + boolean success = false; + Connection dbConn = null; + try { + Statement stmt = null; + ResultSet rs = null; + ResultSet lockHandle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long txnid = rqst.getTxnid(); + stmt = dbConn.createStatement(); + if (isValidTxn(txnid)) { + //this also ensures that txn is still there in expected state + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } + } + /** Get the next lock id. + * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. + * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, + * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} + * doesn't block on locks acquired later than one it's checking*/ + String s = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + if (txnid > 0) { + List<String> rows = new ArrayList<>(); + // For each component in this lock request, + // add an entry to the txn_components table + for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + boolean updateTxnComponents; + if(!lc.isSetOperationType()) { + //request came from old version of the client + updateTxnComponents = true;//this matches old behavior + } + else {
<TRUNCATED>