This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 256a52a HIVE-23201: Improve logging in locking (Marton Bod via Denys Kuzmenko) 256a52a is described below commit 256a52a6137dfd1bfc02de34691ac4cda2f86ea2 Author: Marton Bod <m...@cloudera.com> AuthorDate: Tue Apr 28 10:59:03 2020 +0200 HIVE-23201: Improve logging in locking (Marton Bod via Denys Kuzmenko) --- .../hadoop/hive/ql/lockmgr/DbLockManager.java | 4 + .../hadoop/hive/metastore/txn/TxnHandler.java | 294 ++++++++++----------- .../apache/hadoop/hive/metastore/txn/TxnUtils.java | 21 +- 3 files changed, 152 insertions(+), 167 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 4b6bc3e..fb5a306 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -112,6 +112,8 @@ public final class DbLockManager implements HiveLockManager{ long startRetry = System.currentTimeMillis(); while (res.getState() == LockState.WAITING && numRetries++ < maxNumWaits) { backoff(); + LOG.debug("Starting retry attempt:#{} to acquire locks for lockId={}. QueryId={}", + numRetries, res.getLockid(), queryId); res = txnManager.getMS().checkLock(res.getLockid()); } long retryDuration = System.currentTimeMillis() - startRetry; @@ -139,6 +141,8 @@ public final class DbLockManager implements HiveLockManager{ locks.add(hl); if (res.getState() != LockState.ACQUIRED) { if(res.getState() == LockState.WAITING) { + LOG.error("Unable to acquire locks for lockId={} after {} retries (retries took {} ms). QueryId={}", + res.getLockid(), numRetries, retryDuration, queryId); /** * the {@link #unlock(HiveLock)} here is more about future proofing when support for * multi-statement txns is added. In that case it's reasonable for the client diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index b43ff39..fe39b0b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -43,6 +43,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -50,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.sql.DataSource; @@ -202,6 +204,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + " VALUES (%s, ?, ?, ?, ?, %s)"; + private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " + + "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " + + "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; + private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " + + "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0"; + private List<TransactionalMetaStoreEventListener> transactionalListeners; @@ -2409,7 +2417,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { success = true; return new ConnectionLockIdPair(dbConn, extLockId); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("enqueueLock failed for request: {}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + @@ -2425,6 +2433,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } catch(RetryException e) { + LOG.debug("Going to retry enqueueLock for request: {}, after catching RetryException with message: {}", + rqst, e.getMessage()); return enqueueLockWithRetry(rqst); } } @@ -2434,7 +2444,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs = stmt.executeQuery(s)) { if (!rs.next()) { - LOG.debug("Going to rollback"); + LOG.error("Failure to get next lock ID for update! SELECT query returned empty ResultSet."); dbConn.rollback(); throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_lock_id"); @@ -2604,12 +2614,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { pstmt.addBatch(); if (intLockId % maxBatchSize == 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxBatchSize); + LOG.debug("Executing a batch of <" + insertLocksQuery + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } if (intLockId % maxBatchSize != 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxBatchSize); + LOG.debug("Executing a batch of <" + insertLocksQuery + "> queries. Batch size: " + intLockId % maxBatchSize); pstmt.executeBatch(); } } @@ -2635,7 +2645,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } return checkLock(dbConn, extLockId, txnId); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("checkLock failed for extLockId={}/txnId={}. Exception msg: {}", extLockId, txnId, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); throw new MetaException("Unable to update transaction database " + @@ -2646,6 +2656,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } catch(RetryException e) { + LOG.debug("Going to retry checkLock for extLockId={}/txnId={} after catching RetryException with message: {}", + extLockId, txnId, e.getMessage()); return checkLockWithRetry(dbConn, extLockId, txnId); } } @@ -2680,12 +2692,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks // are associated with a transaction then heartbeat on that as well. - LockInfo info = getTxnIdFromLockId(dbConn, extLockId); - if(info == null) { - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); - } - if (info.txnId > 0) { - heartbeatTxn(dbConn, info.txnId); + LockInfo lockInfo = getLockFromLockId(dbConn, extLockId) + .orElseThrow(() -> new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId))); + if (lockInfo.txnId > 0) { + heartbeatTxn(dbConn, lockInfo.txnId); } else { heartbeatLock(dbConn, extLockId); @@ -2693,9 +2703,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired //extra heartbeat is logically harmless, but ... - return checkLock(dbConn, extLockId, info.txnId); + return checkLock(dbConn, extLockId, lockInfo.txnId); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("checkLock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); throw new MetaException("Unable to update transaction database " + @@ -2705,6 +2715,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { unlockInternal(); } } catch (RetryException e) { + LOG.debug("Going to retry checkLock for request={} after catching RetryException with message: {}", + rqst, e.getMessage()); return checkLock(rqst); } @@ -2718,8 +2730,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed */ @RetrySemantics.Idempotent - public void unlock(UnlockRequest rqst) - throws NoSuchLockException, TxnOpenException, MetaException { + public void unlock(UnlockRequest rqst) throws TxnOpenException, MetaException { try { Connection dbConn = null; Statement stmt = null; @@ -2746,10 +2757,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { - LOG.debug("Going to rollback"); + LOG.info("Failure to unlock any locks with extLockId={}.", extLockId); dbConn.rollback(); - LockInfo info = getTxnIdFromLockId(dbConn, extLockId); - if(info == null) { + Optional<LockInfo> optLockInfo = getLockFromLockId(dbConn, extLockId); + if (!optLockInfo.isPresent()) { //didn't find any lock with extLockId but at ReadCommitted there is a possibility that //it existed when above delete ran but it didn't have the expected state. LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" + @@ -2757,25 +2768,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //bail here to make the operation idempotent return; } - if(info.txnId != 0) { - String msg = "Unlocking locks associated with transaction not permitted. " + info; + LockInfo lockInfo = optLockInfo.get(); + if (isValidTxn(lockInfo.txnId)) { + String msg = "Unlocking locks associated with transaction not permitted. " + lockInfo; //if a lock is associated with a txn we can only "unlock" if if it's in WAITING state // which really means that the caller wants to give up waiting for the lock LOG.error(msg); throw new TxnOpenException(msg); - } - if(info.txnId == 0) { + } else { //we didn't see this lock when running DELETE stmt above but now it showed up //so should "should never happen" happened... - String msg = "Found lock in unexpected state " + info; + String msg = "Found lock in unexpected state " + lockInfo; LOG.error(msg); throw new MetaException(msg); } } - LOG.debug("Going to commit"); + LOG.debug("Successfully unlocked at least 1 lock with extLockId={}", extLockId); dbConn.commit(); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("Unlock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "unlock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + @@ -4051,8 +4062,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { char lockChar = rs.getString("HL_LOCK_TYPE").charAt(0); type = LockTypeUtil.getLockTypeFromEncoding(lockChar) .orElseThrow(() -> new MetaException("Unknown lock type: " + lockChar)); - txnId = rs.getLong("HL_TXNID");//returns 0 if value is NULL + txnId = rs.getLong("HL_TXNID"); //returns 0 if value is NULL } + LockInfo(ShowLocksResponseElement e) { extLockId = e.getLockid(); intLockId = e.getLockIdInternal(); @@ -4225,8 +4237,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return txnId != 0; } /** - * hl_lock_ext_id by only checking earlier locks. * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller + * hl_lock_ext_id by only checking earlier locks. * * For any given SQL statement all locks required by it are grouped under single extLockId and are * granted all at once or all locks wait. @@ -4255,10 +4267,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ boolean isPartOfDynamicPartitionInsert = true; try { - List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); //being acquired now + List<LockInfo> locksBeingChecked = getLocksFromLockId(dbConn, extLockId); //being acquired now response.setLockid(extLockId); - //This the set of entities that the statement represented by extLockId wants to update + //This is the set of entities that the statement represented by extLockId wants to update List<LockInfo> writeSet = new ArrayList<>(); for (LockInfo info : locksBeingChecked) { @@ -4374,23 +4386,24 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state. LockInfo blockedBy = new LockInfo(rs); long intLockId = rs.getLong("REQ_LOCK_INT_ID"); + LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by ({}})", JavaUtils.lockIdToString(extLockId), + intLockId, JavaUtils.txnIdToString(txnId), blockedBy); - String sqlText = "UPDATE \"HIVE_LOCKS\"" + - " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + - ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + - " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; + String updateBlockedByQuery = "UPDATE \"HIVE_LOCKS\"" + + " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + + ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; - LOG.debug("Executing sql: " + sqlText); - int updCnt = stmt.executeUpdate(sqlText); + LOG.debug("Going to execute query: <" + updateBlockedByQuery + ">"); + int updCnt = stmt.executeUpdate(updateBlockedByQuery); if (updCnt != 1) { + LOG.error("Failure to update blocked lock (extLockId={}, intLockId={}) with the blocking lock's IDs (extLockId={}, intLockId={})", + extLockId, intLockId, blockedBy.extLockId, blockedBy.intLockId); shouldNeverHappen(txnId, extLockId, intLockId); } - LOG.debug("Going to commit"); dbConn.commit(); - LOG.debug("Lock({} intLockId:{} {}) is blocked by Lock({}})", JavaUtils.lockIdToString(extLockId), - intLockId, JavaUtils.txnIdToString(txnId), blockedBy); response.setState(LockState.WAITING); return response; } @@ -4398,7 +4411,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { acquire(dbConn, stmt, locksBeingChecked); // We acquired all the locks, so commit and return acquired. - LOG.debug("Going to commit"); + LOG.debug("Successfully acquired locks: " + locksBeingChecked); dbConn.commit(); response.setState(LockState.ACQUIRED); } finally { @@ -4409,7 +4422,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private void acquire(Connection dbConn, Statement stmt, List<LockInfo> locksBeingChecked) throws SQLException, NoSuchLockException, MetaException { - if(locksBeingChecked == null || locksBeingChecked.isEmpty()) { + if (locksBeingChecked == null || locksBeingChecked.isEmpty()) { return; } long txnId = locksBeingChecked.get(0).txnId; @@ -4423,30 +4436,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < locksBeingChecked.size()) { - LOG.debug("Going to rollback acquire(Connection dbConn, Statement stmt, List<LockInfo> locksBeingChecked)"); + LOG.error("Failure to acquire all locks (acquired: {}, total needed: {}).", rc, locksBeingChecked.size()); dbConn.rollback(); /*select all locks for this ext ID and see which ones are missing*/ - StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":"); - ResultSet rs = stmt.executeQuery("SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId); - while(rs.next()) { - int intLockId = rs.getInt(1); - int idx = 0; - for(; idx < locksBeingChecked.size(); idx++) { - LockInfo expectedLock = locksBeingChecked.get(idx); - if(expectedLock != null && expectedLock.intLockId == intLockId) { - locksBeingChecked.set(idx, null); - break; - } - } - } - for(LockInfo expectedLock : locksBeingChecked) { - if(expectedLock != null) { - sb.append(expectedLock.intLockId).append(","); + String errorMsgTemplate = "No such lock(s): (%s: %s) %s"; + Set<String> notFoundIds = locksBeingChecked.stream() + .map(lockInfo -> Long.toString(lockInfo.intLockId)) + .collect(Collectors.toSet()); + String getIntIdsQuery = "SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; + LOG.debug("Going to execute query: <" + getIntIdsQuery + ">"); + try (ResultSet rs = stmt.executeQuery(getIntIdsQuery)) { + while (rs.next()) { + notFoundIds.remove(rs.getString(1)); } } - sb.append(") ").append(JavaUtils.txnIdToString(txnId)); - close(rs); - throw new NoSuchLockException(sb.toString()); + String errorMsg = String.format(errorMsgTemplate, + JavaUtils.lockIdToString(extLockId), String.join(", ", notFoundIds), JavaUtils.txnIdToString(txnId)); + throw new NoSuchLockException(errorMsg); } } @@ -4457,24 +4463,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private void heartbeatLock(Connection dbConn, long extLockId) throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat - if (extLockId == 0) return; - Statement stmt = null; - try { - stmt = dbConn.createStatement(); - - String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + + if (extLockId == 0) { + return; + } + try (Statement stmt = dbConn.createStatement()) { + String updateHeartbeatQuery = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); + LOG.debug("Going to execute update <" + updateHeartbeatQuery + ">"); + int rc = stmt.executeUpdate(updateHeartbeatQuery); if (rc < 1) { - LOG.debug("Going to rollback"); + LOG.error("Failure to update last heartbeat for extLockId={}.", extLockId); dbConn.rollback(); throw new NoSuchLockException("No such lock: " + JavaUtils.lockIdToString(extLockId)); } - LOG.debug("Going to commit"); + LOG.debug("Successfully heartbeated for extLockId={}", extLockId); dbConn.commit(); - } finally { - closeStmt(stmt); } } @@ -4482,24 +4485,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private void heartbeatTxn(Connection dbConn, long txnid) throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat - if (txnid == 0) return; - Statement stmt = null; - try { - stmt = dbConn.createStatement(); + if (txnid == 0) { + return; + } + try (Statement stmt = dbConn.createStatement()) { String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { ensureValidTxn(dbConn, txnid, stmt); // This should now throw some useful exception. - LOG.warn("Can neither heartbeat txn nor confirm it as invalid."); + LOG.error("Can neither heartbeat txn (txnId={}) nor confirm it as invalid.", txnid); dbConn.rollback(); throw new NoSuchTxnException("No such txn: " + txnid); } - LOG.debug("Going to commit"); + LOG.debug("Successfully heartbeated for txnId={}", txnid); dbConn.commit(); - } finally { - closeStmt(stmt); } } @@ -4681,53 +4682,38 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } - private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { - Statement stmt = null; - ResultSet rs = null; - try { - stmt = dbConn.createStatement(); - String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + - "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + - "\"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - return null; + private Optional<LockInfo> getLockFromLockId(Connection dbConn, long extLockId) throws MetaException, SQLException { + try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_LOCKS_FOR_LOCK_ID_QUERY)) { + pstmt.setLong(1, extLockId); + LOG.debug("Going to execute query <" + SELECT_LOCKS_FOR_LOCK_ID_QUERY + "> for extLockId={}", extLockId); + try (ResultSet rs = pstmt.executeQuery()) { + if (!rs.next()) { + return Optional.empty(); + } + LockInfo info = new LockInfo(rs); + LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(info.txnId)); + return Optional.of(info); } - LockInfo info = new LockInfo(rs); - LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(info.txnId)); - return info; - } finally { - close(rs); - closeStmt(stmt); } } // NEVER call this function without first calling heartbeat(long, long) - private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { - Statement stmt = null; - try { - stmt = dbConn.createStatement(); - String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + - "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + - "\"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - boolean sawAtLeastOne = false; - List<LockInfo> ourLockInfo = new ArrayList<>(); - while (rs.next()) { - ourLockInfo.add(new LockInfo(rs)); - sawAtLeastOne = true; + private List<LockInfo> getLocksFromLockId(Connection dbConn, long extLockId) throws MetaException, SQLException { + try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_LOCKS_FOR_LOCK_ID_QUERY)) { + List<LockInfo> locks = new ArrayList<>(); + pstmt.setLong(1, extLockId); + LOG.debug("Going to execute query <" + SELECT_LOCKS_FOR_LOCK_ID_QUERY + "> for extLockId={}", extLockId); + try (ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + locks.add(new LockInfo(rs)); + } } - if (!sawAtLeastOne) { + if (locks.isEmpty()) { throw new MetaException("This should never happen! We already " + "checked the lock(" + JavaUtils.lockIdToString(extLockId) + ") existed but now we can't find it!"); } - return ourLockInfo; - } finally { - closeStmt(stmt); + LOG.debug("Found {} locks for extLockId={}. Locks: {}", locks.size(), extLockId, locks); + return locks; } } @@ -4736,28 +4722,26 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // and thus should be done before any calls to heartbeat that will leave // open transactions. private void timeOutLocks(Connection dbConn) { - Statement stmt = null; - ResultSet rs = null; - try { - stmt = dbConn.createStatement(); - //doing a SELECT first is less efficient but makes it easier to debug things - String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " + - TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"HL_TXNID\" = 0"; - //when txnid is <> 0, the lock is associated with a txn and is handled by performTimeOuts() - //want to avoid expiring locks for a txn w/o expiring the txn itself - List<Long> extLockIDs = new ArrayList<>(); - rs = stmt.executeQuery(s); - while(rs.next()) { - extLockIDs.add(rs.getLong(1)); - } - rs.close(); - dbConn.commit(); - if(extLockIDs.size() <= 0) { - return; + Set<Long> timedOutLockIds = new TreeSet<>(); + //doing a SELECT first is less efficient but makes it easier to debug things + //when txnid is <> 0, the lock is associated with a txn and is handled by performTimeOuts() + //want to avoid expiring locks for a txn w/o expiring the txn itself + try (PreparedStatement pstmt = dbConn.prepareStatement( + String.format(SELECT_TIMED_OUT_LOCKS_QUERY, TxnDbUtil.getEpochFn(dbProduct)))) { + pstmt.setLong(1, timeout); + LOG.debug("Going to execute query: <" + SELECT_TIMED_OUT_LOCKS_QUERY + ">"); + try (ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + timedOutLockIds.add(rs.getLong(1)); + } + dbConn.commit(); + if (timedOutLockIds.isEmpty()) { + LOG.debug("Did not find any timed-out locks, therefore retuning."); + return; + } } List<String> queries = new ArrayList<>(); - StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -4766,29 +4750,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { prefix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); prefix.append(" AND \"HL_TXNID\" = 0 AND "); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false); - - int deletedLocks = 0; - for (String query : queries) { - LOG.debug("Removing expired locks via: " + query); - deletedLocks += stmt.executeUpdate(query); - } - if(deletedLocks > 0) { - Collections.sort(extLockIDs);//easier to read logs - LOG.info("Deleted " + deletedLocks + " int locks from HIVE_LOCKS due to timeout (" + - "HL_LOCK_EXT_ID list: " + extLockIDs + ")"); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, timedOutLockIds, + "\"HL_LOCK_EXT_ID\"", true, false); + try (Statement stmt = dbConn.createStatement()) { + int deletedLocks = 0; + for (String query : queries) { + LOG.debug("Going to execute update: <" + query + ">"); + deletedLocks += stmt.executeUpdate(query); + } + if (deletedLocks > 0) { + LOG.info("Deleted {} locks due to timed-out. Lock ids: {}", deletedLocks, timedOutLockIds); + } + dbConn.commit(); } - LOG.debug("Going to commit"); - dbConn.commit(); } - catch(SQLException ex) { - LOG.error("Failed to purge timedout locks due to: " + getMessage(ex), ex); + catch (SQLException ex) { + LOG.error("Failed to purge timed-out locks: " + getMessage(ex), ex); } - catch(Exception ex) { - LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex); - } finally { - close(rs); - closeStmt(stmt); + catch (Exception ex) { + LOG.error("Failed to purge timed-out locks: " + ex.getMessage(), ex); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index b3a1f82..4ee1a45 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class TxnUtils { private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); @@ -172,9 +174,9 @@ public class TxnUtils { * Build a query (or queries if one query is too big but only for the case of 'IN' * composite clause. For the case of 'NOT IN' clauses, multiple queries change * the semantics of the intended query. - * E.g., Let's assume that input "inList" parameter has [5, 6] and that + * E.g., Let's assume that input "inValues" parameter has [5, 6] and that * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause, - * Then having two delete statements changes the semantics of the inteneded SQL statement. + * Then having two delete statements changes the semantics of the intended SQL statement. * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence * is not equal to 'delete from T where a not in (5, 6)'.) * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters. @@ -192,7 +194,7 @@ public class TxnUtils { * @param queries OUT: Array of query strings * @param prefix IN: Part of the query that comes before IN list * @param suffix IN: Part of the query that comes after IN list - * @param inList IN: the list with IN list values + * @param inValues IN: Collection containing IN clause values * @param inColumn IN: single column name of IN list operator * @param addParens IN: add a pair of parenthesis outside the IN lists * e.g. "(id in (1,2,3) OR id in (4,5,6))" @@ -203,17 +205,16 @@ public class TxnUtils { List<String> queries, StringBuilder prefix, StringBuilder suffix, - List<Long> inList, + Collection<Long> inValues, String inColumn, boolean addParens, boolean notIn) { - List<String> inListStrings = new ArrayList<>(inList.size()); - for (Long aLong : inList) { - inListStrings.add(aLong.toString()); - } - return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, - inListStrings, inColumn, addParens, notIn); + List<String> inValueStrings = inValues.stream() + .map(Object::toString) + .collect(Collectors.toList()); + return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, + inValueStrings, inColumn, addParens, notIn); } /** * Build a query (or queries if one query is too big but only for the case of 'IN'