This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 256a52a  HIVE-23201: Improve logging in locking (Marton Bod via Denys 
Kuzmenko)
256a52a is described below

commit 256a52a6137dfd1bfc02de34691ac4cda2f86ea2
Author: Marton Bod <m...@cloudera.com>
AuthorDate: Tue Apr 28 10:59:03 2020 +0200

    HIVE-23201: Improve logging in locking (Marton Bod via Denys Kuzmenko)
---
 .../hadoop/hive/ql/lockmgr/DbLockManager.java      |   4 +
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 294 ++++++++++-----------
 .../apache/hadoop/hive/metastore/txn/TxnUtils.java |  21 +-
 3 files changed, 152 insertions(+), 167 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 4b6bc3e..fb5a306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -112,6 +112,8 @@ public final class DbLockManager implements HiveLockManager{
       long startRetry = System.currentTimeMillis();
       while (res.getState() == LockState.WAITING && numRetries++ < 
maxNumWaits) {
         backoff();
+        LOG.debug("Starting retry attempt:#{} to acquire locks for lockId={}. 
QueryId={}",
+                numRetries, res.getLockid(), queryId);
         res = txnManager.getMS().checkLock(res.getLockid());
       }
       long retryDuration = System.currentTimeMillis() - startRetry;
@@ -139,6 +141,8 @@ public final class DbLockManager implements HiveLockManager{
       locks.add(hl);
       if (res.getState() != LockState.ACQUIRED) {
         if(res.getState() == LockState.WAITING) {
+          LOG.error("Unable to acquire locks for lockId={} after {} retries 
(retries took {} ms). QueryId={}",
+                  res.getLockid(), numRetries, retryDuration, queryId);
           /**
            * the {@link #unlock(HiveLock)} here is more about future proofing 
when support for
            * multi-statement txns is added.  In that case it's reasonable for 
the client
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b43ff39..fe39b0b 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -43,6 +43,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
@@ -50,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import javax.sql.DataSource;
 
@@ -202,6 +204,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO 
\"COMPLETED_TXN_COMPONENTS\" " +
           "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", 
\"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
           " VALUES (%s, ?, ?, ?, ?, %s)";
+  private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT 
\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " +
+          "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", 
\"HL_LOCK_TYPE\", \"HL_TXNID\" " +
+          "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
+  private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT 
\"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
+          "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
+
 
   private List<TransactionalMetaStoreEventListener> transactionalListeners;
 
@@ -2409,7 +2417,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         success = true;
         return new ConnectionLockIdPair(dbConn, extLockId);
       } catch (SQLException e) {
-        LOG.debug("Going to rollback");
+        LOG.error("enqueueLock failed for request: {}. Exception msg: {}", 
rqst, getMessage(e));
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")");
         throw new MetaException("Unable to update transaction database " +
@@ -2425,6 +2433,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       }
     }
     catch(RetryException e) {
+      LOG.debug("Going to retry enqueueLock for request: {}, after catching 
RetryException with message: {}",
+              rqst, e.getMessage());
       return enqueueLockWithRetry(rqst);
     }
   }
@@ -2434,7 +2444,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     LOG.debug("Going to execute query <" + s + ">");
     try (ResultSet rs = stmt.executeQuery(s)) {
       if (!rs.next()) {
-        LOG.debug("Going to rollback");
+        LOG.error("Failure to get next lock ID for update! SELECT query 
returned empty ResultSet.");
         dbConn.rollback();
         throw new MetaException("Transaction tables not properly " +
                 "initialized, no record found in next_lock_id");
@@ -2604,12 +2614,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
         pstmt.addBatch();
         if (intLockId % maxBatchSize == 0) {
-          LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + 
maxBatchSize);
+          LOG.debug("Executing a batch of <" + insertLocksQuery + "> queries. 
Batch size: " + maxBatchSize);
           pstmt.executeBatch();
         }
       }
       if (intLockId % maxBatchSize != 0) {
-        LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + 
intLockId % maxBatchSize);
+        LOG.debug("Executing a batch of <" + insertLocksQuery + "> queries. 
Batch size: " + intLockId % maxBatchSize);
         pstmt.executeBatch();
       }
     }
@@ -2635,7 +2645,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         }
         return checkLock(dbConn, extLockId, txnId);
       } catch (SQLException e) {
-        LOG.debug("Going to rollback");
+        LOG.error("checkLock failed for extLockId={}/txnId={}. Exception msg: 
{}", extLockId, txnId, getMessage(e));
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + 
txnId + ")");
         throw new MetaException("Unable to update transaction database " +
@@ -2646,6 +2656,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       }
     }
     catch(RetryException e) {
+      LOG.debug("Going to retry checkLock for extLockId={}/txnId={} after 
catching RetryException with message: {}",
+              extLockId, txnId, e.getMessage());
       return checkLockWithRetry(dbConn, extLockId, txnId);
     }
   }
@@ -2680,12 +2692,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // Heartbeat on the lockid first, to assure that our lock is still 
valid.
         // Then look up the lock info (hopefully in the cache).  If these locks
         // are associated with a transaction then heartbeat on that as well.
-        LockInfo info = getTxnIdFromLockId(dbConn, extLockId);
-        if(info == null) {
-          throw new NoSuchLockException("No such lock " + 
JavaUtils.lockIdToString(extLockId));
-        }
-        if (info.txnId > 0) {
-          heartbeatTxn(dbConn, info.txnId);
+        LockInfo lockInfo = getLockFromLockId(dbConn, extLockId)
+                .orElseThrow(() -> new NoSuchLockException("No such lock " + 
JavaUtils.lockIdToString(extLockId)));
+        if (lockInfo.txnId > 0) {
+          heartbeatTxn(dbConn, lockInfo.txnId);
         }
         else {
           heartbeatLock(dbConn, extLockId);
@@ -2693,9 +2703,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         //todo: strictly speaking there is a bug here.  heartbeat*() commits 
but both heartbeat and
         //checkLock() are in the same retry block, so if checkLock() throws, 
heartbeat is also retired
         //extra heartbeat is logically harmless, but ...
-        return checkLock(dbConn, extLockId, info.txnId);
+        return checkLock(dbConn, extLockId, lockInfo.txnId);
       } catch (SQLException e) {
-        LOG.debug("Going to rollback");
+        LOG.error("checkLock failed for request={}. Exception msg: {}", rqst, 
getMessage(e));
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "checkLock(" + rqst + " )");
         throw new MetaException("Unable to update transaction database " +
@@ -2705,6 +2715,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         unlockInternal();
       }
     } catch (RetryException e) {
+      LOG.debug("Going to retry checkLock for request={} after catching 
RetryException with message: {}",
+              rqst, e.getMessage());
       return checkLock(rqst);
     }
 
@@ -2718,8 +2730,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * since this only removes from HIVE_LOCKS at worst some lock acquire is 
delayed
    */
   @RetrySemantics.Idempotent
-  public void unlock(UnlockRequest rqst)
-    throws NoSuchLockException, TxnOpenException, MetaException {
+  public void unlock(UnlockRequest rqst) throws TxnOpenException, 
MetaException {
     try {
       Connection dbConn = null;
       Statement stmt = null;
@@ -2746,10 +2757,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         LOG.debug("Going to execute update <" + s + ">");
         int rc = stmt.executeUpdate(s);
         if (rc < 1) {
-          LOG.debug("Going to rollback");
+          LOG.info("Failure to unlock any locks with extLockId={}.", 
extLockId);
           dbConn.rollback();
-          LockInfo info = getTxnIdFromLockId(dbConn, extLockId);
-          if(info == null) {
+          Optional<LockInfo> optLockInfo = getLockFromLockId(dbConn, 
extLockId);
+          if (!optLockInfo.isPresent()) {
             //didn't find any lock with extLockId but at ReadCommitted there 
is a possibility that
             //it existed when above delete ran but it didn't have the expected 
state.
             LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" +
@@ -2757,25 +2768,25 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
             //bail here to make the operation idempotent
             return;
           }
-          if(info.txnId != 0) {
-            String msg = "Unlocking locks associated with transaction not 
permitted.  " + info;
+          LockInfo lockInfo = optLockInfo.get();
+          if (isValidTxn(lockInfo.txnId)) {
+            String msg = "Unlocking locks associated with transaction not 
permitted.  " + lockInfo;
             //if a lock is associated with a txn we can only "unlock" if if 
it's in WAITING state
             // which really means that the caller wants to give up waiting for 
the lock
             LOG.error(msg);
             throw new TxnOpenException(msg);
-          }
-          if(info.txnId == 0) {
+          } else {
             //we didn't see this lock when running DELETE stmt above but now 
it showed up
             //so should "should never happen" happened...
-            String msg = "Found lock in unexpected state " + info;
+            String msg = "Found lock in unexpected state " + lockInfo;
             LOG.error(msg);
             throw new MetaException(msg);
           }
         }
-        LOG.debug("Going to commit");
+        LOG.debug("Successfully unlocked at least 1 lock with extLockId={}", 
extLockId);
         dbConn.commit();
       } catch (SQLException e) {
-        LOG.debug("Going to rollback");
+        LOG.error("Unlock failed for request={}. Exception msg: {}", rqst, 
getMessage(e));
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "unlock(" + rqst + ")");
         throw new MetaException("Unable to update transaction database " +
@@ -4051,8 +4062,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       char lockChar = rs.getString("HL_LOCK_TYPE").charAt(0);
       type = LockTypeUtil.getLockTypeFromEncoding(lockChar)
               .orElseThrow(() -> new MetaException("Unknown lock type: " + 
lockChar));
-      txnId = rs.getLong("HL_TXNID");//returns 0 if value is NULL
+      txnId = rs.getLong("HL_TXNID"); //returns 0 if value is NULL
     }
+
     LockInfo(ShowLocksResponseElement e) {
       extLockId = e.getLockid();
       intLockId = e.getLockIdInternal();
@@ -4225,8 +4237,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     return txnId != 0;
   }
   /**
-   * hl_lock_ext_id by only checking earlier locks.
    * Lock acquisition is meant to be fair, so every lock can only block on 
some lock with smaller
+   * hl_lock_ext_id by only checking earlier locks.
    *
    * For any given SQL statement all locks required by it are grouped under 
single extLockId and are
    * granted all at once or all locks wait.
@@ -4255,10 +4267,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
      */
     boolean isPartOfDynamicPartitionInsert = true;
     try {
-      List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, 
extLockId); //being acquired now
+      List<LockInfo> locksBeingChecked = getLocksFromLockId(dbConn, 
extLockId); //being acquired now
       response.setLockid(extLockId);
 
-      //This the set of entities that the statement represented by extLockId 
wants to update
+      //This is the set of entities that the statement represented by 
extLockId wants to update
       List<LockInfo> writeSet = new ArrayList<>();
 
       for (LockInfo info : locksBeingChecked) {
@@ -4374,23 +4386,24 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // We acquire all locks for a given query atomically; if 1 blocks, all 
remain in Waiting state.
         LockInfo blockedBy = new LockInfo(rs);
         long intLockId = rs.getLong("REQ_LOCK_INT_ID");
+        LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by 
({}})", JavaUtils.lockIdToString(extLockId),
+                intLockId, JavaUtils.txnIdToString(txnId), blockedBy);
 
-        String sqlText = "UPDATE \"HIVE_LOCKS\"" +
-          " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId +
-              ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId +
-          " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND 
\"HL_LOCK_INT_ID\" = " + intLockId;
+        String updateBlockedByQuery = "UPDATE \"HIVE_LOCKS\"" +
+                " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId +
+                ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId +
+                " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND 
\"HL_LOCK_INT_ID\" = " + intLockId;
 
-        LOG.debug("Executing sql: " + sqlText);
-        int updCnt = stmt.executeUpdate(sqlText);
+        LOG.debug("Going to execute query: <" + updateBlockedByQuery + ">");
+        int updCnt = stmt.executeUpdate(updateBlockedByQuery);
 
         if (updCnt != 1) {
+          LOG.error("Failure to update blocked lock (extLockId={}, 
intLockId={}) with the blocking lock's IDs (extLockId={}, intLockId={})",
+                  extLockId, intLockId, blockedBy.extLockId, 
blockedBy.intLockId);
           shouldNeverHappen(txnId, extLockId, intLockId);
         }
-        LOG.debug("Going to commit");
         dbConn.commit();
 
-        LOG.debug("Lock({} intLockId:{} {}) is blocked by Lock({}})", 
JavaUtils.lockIdToString(extLockId),
-          intLockId, JavaUtils.txnIdToString(txnId), blockedBy);
         response.setState(LockState.WAITING);
         return response;
       }
@@ -4398,7 +4411,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       acquire(dbConn, stmt, locksBeingChecked);
 
       // We acquired all the locks, so commit and return acquired.
-      LOG.debug("Going to commit");
+      LOG.debug("Successfully acquired locks: " + locksBeingChecked);
       dbConn.commit();
       response.setState(LockState.ACQUIRED);
     } finally {
@@ -4409,7 +4422,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
 
   private void acquire(Connection dbConn, Statement stmt, List<LockInfo> 
locksBeingChecked)
     throws SQLException, NoSuchLockException, MetaException {
-    if(locksBeingChecked == null || locksBeingChecked.isEmpty()) {
+    if (locksBeingChecked == null || locksBeingChecked.isEmpty()) {
       return;
     }
     long txnId = locksBeingChecked.get(0).txnId;
@@ -4423,30 +4436,23 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
     if (rc < locksBeingChecked.size()) {
-      LOG.debug("Going to rollback acquire(Connection dbConn, Statement stmt, 
List<LockInfo> locksBeingChecked)");
+      LOG.error("Failure to acquire all locks (acquired: {}, total needed: 
{}).", rc, locksBeingChecked.size());
       dbConn.rollback();
       /*select all locks for this ext ID and see which ones are missing*/
-      StringBuilder sb = new StringBuilder("No such lock(s): (" + 
JavaUtils.lockIdToString(extLockId) + ":");
-      ResultSet rs = stmt.executeQuery("SELECT \"HL_LOCK_INT_ID\" FROM 
\"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId);
-      while(rs.next()) {
-        int intLockId = rs.getInt(1);
-        int idx = 0;
-        for(; idx < locksBeingChecked.size(); idx++) {
-          LockInfo expectedLock = locksBeingChecked.get(idx);
-          if(expectedLock != null && expectedLock.intLockId == intLockId) {
-            locksBeingChecked.set(idx, null);
-            break;
-          }
-        }
-      }
-      for(LockInfo expectedLock : locksBeingChecked) {
-        if(expectedLock != null) {
-          sb.append(expectedLock.intLockId).append(",");
+      String errorMsgTemplate = "No such lock(s): (%s: %s) %s";
+      Set<String> notFoundIds = locksBeingChecked.stream()
+              .map(lockInfo -> Long.toString(lockInfo.intLockId))
+              .collect(Collectors.toSet());
+      String getIntIdsQuery = "SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" 
WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
+      LOG.debug("Going to execute query: <" + getIntIdsQuery + ">");
+      try (ResultSet rs = stmt.executeQuery(getIntIdsQuery)) {
+        while (rs.next()) {
+          notFoundIds.remove(rs.getString(1));
         }
       }
-      sb.append(") ").append(JavaUtils.txnIdToString(txnId));
-      close(rs);
-      throw new NoSuchLockException(sb.toString());
+      String errorMsg = String.format(errorMsgTemplate,
+              JavaUtils.lockIdToString(extLockId), String.join(", ", 
notFoundIds), JavaUtils.txnIdToString(txnId));
+      throw new NoSuchLockException(errorMsg);
     }
   }
 
@@ -4457,24 +4463,21 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   private void heartbeatLock(Connection dbConn, long extLockId)
     throws NoSuchLockException, SQLException, MetaException {
     // If the lock id is 0, then there are no locks in this heartbeat
-    if (extLockId == 0) return;
-    Statement stmt = null;
-    try {
-      stmt = dbConn.createStatement();
-
-      String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " +
+    if (extLockId == 0) {
+      return;
+    }
+    try (Statement stmt = dbConn.createStatement()) {
+      String updateHeartbeatQuery = "UPDATE \"HIVE_LOCKS\" SET 
\"HL_LAST_HEARTBEAT\" = " +
           TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"HL_LOCK_EXT_ID\" = " + 
extLockId;
-      LOG.debug("Going to execute update <" + s + ">");
-      int rc = stmt.executeUpdate(s);
+      LOG.debug("Going to execute update <" + updateHeartbeatQuery + ">");
+      int rc = stmt.executeUpdate(updateHeartbeatQuery);
       if (rc < 1) {
-        LOG.debug("Going to rollback");
+        LOG.error("Failure to update last heartbeat for extLockId={}.", 
extLockId);
         dbConn.rollback();
         throw new NoSuchLockException("No such lock: " + 
JavaUtils.lockIdToString(extLockId));
       }
-      LOG.debug("Going to commit");
+      LOG.debug("Successfully heartbeated for extLockId={}", extLockId);
       dbConn.commit();
-    } finally {
-      closeStmt(stmt);
     }
   }
 
@@ -4482,24 +4485,22 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   private void heartbeatTxn(Connection dbConn, long txnid)
     throws NoSuchTxnException, TxnAbortedException, SQLException, 
MetaException {
     // If the txnid is 0, then there are no transactions in this heartbeat
-    if (txnid == 0) return;
-    Statement stmt = null;
-    try {
-      stmt = dbConn.createStatement();
+    if (txnid == 0) {
+      return;
+    }
+    try (Statement stmt = dbConn.createStatement()) {
       String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + 
TxnDbUtil.getEpochFn(dbProduct) +
         " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + 
"'";
       LOG.debug("Going to execute update <" + s + ">");
       int rc = stmt.executeUpdate(s);
       if (rc < 1) {
         ensureValidTxn(dbConn, txnid, stmt); // This should now throw some 
useful exception.
-        LOG.warn("Can neither heartbeat txn nor confirm it as invalid.");
+        LOG.error("Can neither heartbeat txn (txnId={}) nor confirm it as 
invalid.", txnid);
         dbConn.rollback();
         throw new NoSuchTxnException("No such txn: " + txnid);
       }
-      LOG.debug("Going to commit");
+      LOG.debug("Successfully heartbeated for txnId={}", txnid);
       dbConn.commit();
-    } finally {
-      closeStmt(stmt);
     }
   }
 
@@ -4681,53 +4682,38 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
 
-  private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId)
-    throws NoSuchLockException, MetaException, SQLException {
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = dbConn.createStatement();
-      String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", 
\"HL_TABLE\", " +
-        "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" 
FROM \"HIVE_LOCKS\" WHERE " +
-        "\"HL_LOCK_EXT_ID\" = " + extLockId;
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        return null;
+  private Optional<LockInfo> getLockFromLockId(Connection dbConn, long 
extLockId) throws MetaException, SQLException {
+    try (PreparedStatement pstmt = 
dbConn.prepareStatement(SELECT_LOCKS_FOR_LOCK_ID_QUERY)) {
+      pstmt.setLong(1, extLockId);
+      LOG.debug("Going to execute query <" + SELECT_LOCKS_FOR_LOCK_ID_QUERY + 
"> for extLockId={}", extLockId);
+      try (ResultSet rs = pstmt.executeQuery()) {
+        if (!rs.next()) {
+          return Optional.empty();
+        }
+        LockInfo info = new LockInfo(rs);
+        LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + 
JavaUtils.txnIdToString(info.txnId));
+        return Optional.of(info);
       }
-      LockInfo info = new LockInfo(rs);
-      LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + 
JavaUtils.txnIdToString(info.txnId));
-      return info;
-    } finally {
-      close(rs);
-      closeStmt(stmt);
     }
   }
 
   // NEVER call this function without first calling heartbeat(long, long)
-  private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long 
extLockId)
-    throws NoSuchLockException, MetaException, SQLException {
-    Statement stmt = null;
-    try {
-      stmt = dbConn.createStatement();
-      String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", 
\"HL_TABLE\", " +
-        "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" 
FROM \"HIVE_LOCKS\" WHERE " +
-        "\"HL_LOCK_EXT_ID\" = " + extLockId;
-      LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      boolean sawAtLeastOne = false;
-      List<LockInfo> ourLockInfo = new ArrayList<>();
-      while (rs.next()) {
-        ourLockInfo.add(new LockInfo(rs));
-        sawAtLeastOne = true;
+  private List<LockInfo> getLocksFromLockId(Connection dbConn, long extLockId) 
throws MetaException, SQLException {
+    try (PreparedStatement pstmt = 
dbConn.prepareStatement(SELECT_LOCKS_FOR_LOCK_ID_QUERY)) {
+      List<LockInfo> locks = new ArrayList<>();
+      pstmt.setLong(1, extLockId);
+      LOG.debug("Going to execute query <" + SELECT_LOCKS_FOR_LOCK_ID_QUERY + 
"> for extLockId={}", extLockId);
+      try (ResultSet rs = pstmt.executeQuery()) {
+        while (rs.next()) {
+          locks.add(new LockInfo(rs));
+        }
       }
-      if (!sawAtLeastOne) {
+      if (locks.isEmpty()) {
         throw new MetaException("This should never happen!  We already " +
           "checked the lock(" + JavaUtils.lockIdToString(extLockId) + ") 
existed but now we can't find it!");
       }
-      return ourLockInfo;
-    } finally {
-      closeStmt(stmt);
+      LOG.debug("Found {} locks for extLockId={}. Locks: {}", locks.size(), 
extLockId, locks);
+      return locks;
     }
   }
 
@@ -4736,28 +4722,26 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   // and thus should be done before any calls to heartbeat that will leave
   // open transactions.
   private void timeOutLocks(Connection dbConn) {
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = dbConn.createStatement();
-      //doing a SELECT first is less efficient but makes it easier to debug 
things
-      String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE 
\"HL_LAST_HEARTBEAT\" < " +
-          TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"HL_TXNID\" 
= 0";
-      //when txnid is <> 0, the lock is associated with a txn and is handled 
by performTimeOuts()
-      //want to avoid expiring locks for a txn w/o expiring the txn itself
-      List<Long> extLockIDs = new ArrayList<>();
-      rs = stmt.executeQuery(s);
-      while(rs.next()) {
-        extLockIDs.add(rs.getLong(1));
-      }
-      rs.close();
-      dbConn.commit();
-      if(extLockIDs.size() <= 0) {
-        return;
+    Set<Long> timedOutLockIds = new TreeSet<>();
+    //doing a SELECT first is less efficient but makes it easier to debug 
things
+    //when txnid is <> 0, the lock is associated with a txn and is handled by 
performTimeOuts()
+    //want to avoid expiring locks for a txn w/o expiring the txn itself
+    try (PreparedStatement pstmt = dbConn.prepareStatement(
+            String.format(SELECT_TIMED_OUT_LOCKS_QUERY, 
TxnDbUtil.getEpochFn(dbProduct)))) {
+      pstmt.setLong(1, timeout);
+      LOG.debug("Going to execute query: <" + SELECT_TIMED_OUT_LOCKS_QUERY + 
">");
+      try (ResultSet rs = pstmt.executeQuery()) {
+        while (rs.next()) {
+          timedOutLockIds.add(rs.getLong(1));
+        }
+        dbConn.commit();
+        if (timedOutLockIds.isEmpty()) {
+          LOG.debug("Did not find any timed-out locks, therefore retuning.");
+          return;
+        }
       }
 
       List<String> queries = new ArrayList<>();
-
       StringBuilder prefix = new StringBuilder();
       StringBuilder suffix = new StringBuilder();
 
@@ -4766,29 +4750,25 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       
prefix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
       prefix.append(" AND \"HL_TXNID\" = 0 AND ");
 
-      TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false);
-
-      int deletedLocks = 0;
-      for (String query : queries) {
-        LOG.debug("Removing expired locks via: " + query);
-        deletedLocks += stmt.executeUpdate(query);
-      }
-      if(deletedLocks > 0) {
-        Collections.sort(extLockIDs);//easier to read logs
-        LOG.info("Deleted " + deletedLocks + " int locks from HIVE_LOCKS due 
to timeout (" +
-          "HL_LOCK_EXT_ID list:  " + extLockIDs + ")");
+      TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
timedOutLockIds,
+              "\"HL_LOCK_EXT_ID\"", true, false);
+      try (Statement stmt = dbConn.createStatement()) {
+        int deletedLocks = 0;
+        for (String query : queries) {
+          LOG.debug("Going to execute update: <" + query + ">");
+          deletedLocks += stmt.executeUpdate(query);
+        }
+        if (deletedLocks > 0) {
+          LOG.info("Deleted {} locks due to timed-out. Lock ids: {}", 
deletedLocks, timedOutLockIds);
+        }
+        dbConn.commit();
       }
-      LOG.debug("Going to commit");
-      dbConn.commit();
     }
-    catch(SQLException ex) {
-      LOG.error("Failed to purge timedout locks due to: " + getMessage(ex), 
ex);
+    catch (SQLException ex) {
+      LOG.error("Failed to purge timed-out locks: " + getMessage(ex), ex);
     }
-    catch(Exception ex) {
-      LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), 
ex);
-    } finally {
-      close(rs);
-      closeStmt(stmt);
+    catch (Exception ex) {
+      LOG.error("Failed to purge timed-out locks: " + ex.getMessage(), ex);
     }
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index b3a1f82..4ee1a45 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class TxnUtils {
   private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
@@ -172,9 +174,9 @@ public class TxnUtils {
    * Build a query (or queries if one query is too big but only for the case 
of 'IN'
    * composite clause. For the case of 'NOT IN' clauses, multiple queries 
change
    * the semantics of the intended query.
-   * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+   * E.g., Let's assume that input "inValues" parameter has [5, 6] and that
    * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one 
value in a 'NOT IN' clause,
-   * Then having two delete statements changes the semantics of the inteneded 
SQL statement.
+   * Then having two delete statements changes the semantics of the intended 
SQL statement.
    * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in 
(6)' sequence
    * is not equal to 'delete from T where a not in (5, 6)'.)
    * with one or multiple 'IN' or 'NOT IN' clauses with the given input 
parameters.
@@ -192,7 +194,7 @@ public class TxnUtils {
    * @param queries   OUT: Array of query strings
    * @param prefix    IN:  Part of the query that comes before IN list
    * @param suffix    IN:  Part of the query that comes after IN list
-   * @param inList    IN:  the list with IN list values
+   * @param inValues  IN:  Collection containing IN clause values
    * @param inColumn  IN:  single column name of IN list operator
    * @param addParens IN:  add a pair of parenthesis outside the IN lists
    *                       e.g. "(id in (1,2,3) OR id in (4,5,6))"
@@ -203,17 +205,16 @@ public class TxnUtils {
                                             List<String> queries,
                                             StringBuilder prefix,
                                             StringBuilder suffix,
-                                            List<Long> inList,
+                                            Collection<Long> inValues,
                                             String inColumn,
                                             boolean addParens,
                                             boolean notIn) {
-    List<String> inListStrings = new ArrayList<>(inList.size());
-    for (Long aLong : inList) {
-      inListStrings.add(aLong.toString());
-    }
-    return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
-        inListStrings, inColumn, addParens, notIn);
+    List<String> inValueStrings = inValues.stream()
+            .map(Object::toString)
+            .collect(Collectors.toList());
 
+    return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+            inValueStrings, inColumn, addParens, notIn);
   }
   /**
    * Build a query (or queries if one query is too big but only for the case 
of 'IN'

Reply via email to