Author: ekoifman Date: Thu Apr 16 17:58:38 2015 New Revision: 1674119 URL: http://svn.apache.org/r1674119 Log: HIVE-10242 ACID: insert overwrite prevents create table command (Eugene Koifman, reviewed by Alan Gates)
Modified: hive/trunk/hcatalog/src/test/e2e/templeton/deployers/env.sh hive/trunk/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java Modified: hive/trunk/hcatalog/src/test/e2e/templeton/deployers/env.sh URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/deployers/env.sh?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/hcatalog/src/test/e2e/templeton/deployers/env.sh (original) +++ hive/trunk/hcatalog/src/test/e2e/templeton/deployers/env.sh Thu Apr 16 17:58:38 2015 @@ -50,6 +50,11 @@ if [ -z ${HADOOP_HOME} ]; then export HADOOP_HOME=/Users/${USER}/dev/hwxhadoop/hadoop-dist/target/hadoop-${HADOOP_VERSION} fi +if [ -z ${MYSQL_CLIENT_JAR} ]; then + #if using MySQL backed metastore + export MYSQL_CLIENT_JAR=/Users/${USER}/dev/mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar +fi + export TEZ_CLIENT_HOME=/Users/ekoifman/dev/apache-tez-client-${TEZ_VERSION} #Make sure Pig is built for the Hadoop version you are running export PIG_TAR_PATH=/Users/${USER}/dev/pig-${PIG_VERSION}-src/build Modified: hive/trunk/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh (original) +++ hive/trunk/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh Thu Apr 16 17:58:38 2015 @@ -25,10 +25,17 @@ source ./env.sh #decide which DB to run against +#Derby cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml +#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml ${HIVE_HOME}/conf/hive-site.xml #cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mssql.xml ${HIVE_HOME}/conf/hive-site.xml cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml ${HIVE_HOME}/hcatalog/etc/webhcat/webhcat-site.xml +cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties ${HIVE_HOME}/conf/hive-log4j.properties + +if [ -f ${MYSQL_CLIENT_JAR} ]; then + cp ${MYSQL_CLIENT_JAR} ${HIVE_HOME}/lib +fi if [ -d ${WEBHCAT_LOG_DIR} ]; then rm -Rf ${WEBHCAT_LOG_DIR}; Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Thu Apr 16 17:58:38 2015 @@ -533,18 +533,29 @@ public class TxnHandler { } } + /** + * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse} + */ + private static class LockInfoExt extends LockInfo { + private final ShowLocksResponseElement e; + LockInfoExt(ShowLocksResponseElement e, long intLockId) { + super(e, intLockId); + this.e = e; + } + } public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { try { Connection dbConn = null; ShowLocksResponse rsp = new ShowLocksResponse(); List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>(); + List<LockInfoExt> sortedList = new ArrayList<LockInfoExt>(); Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS"; LOG.debug("Doing to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -572,7 +583,7 @@ public class TxnHandler { if (!rs.wasNull()) e.setAcquiredat(acquiredAt); e.setUser(rs.getString(10)); e.setHostname(rs.getString(11)); - elems.add(e); + sortedList.add(new LockInfoExt(e, rs.getLong(12))); } LOG.debug("Going to rollback"); dbConn.rollback(); @@ -584,6 +595,12 @@ public class TxnHandler { closeStmt(stmt); closeDbConn(dbConn); } + //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined + //by checkLock() - makes diagnostics easier. + Collections.sort(sortedList, new LockInfoComparator()); + for(LockInfoExt lockInfoExt : sortedList) { + elems.add(lockInfoExt.e); + } rsp.setLocks(elems); return rsp; } catch (RetryException e) { @@ -1086,17 +1103,17 @@ public class TxnHandler { } private static class LockInfo { - long extLockId; - long intLockId; - long txnId; - String db; - String table; - String partition; - LockState state; - LockType type; + private final long extLockId; + private final long intLockId; + private final long txnId; + private final String db; + private final String table; + private final String partition; + private final LockState state; + private final LockType type; // Assumes the result set is set to a valid row - LockInfo(ResultSet rs) throws SQLException { + LockInfo(ResultSet rs) throws SQLException, MetaException { extLockId = rs.getLong("hl_lock_ext_id"); // can't be null intLockId = rs.getLong("hl_lock_int_id"); // can't be null db = rs.getString("hl_db"); // can't be null @@ -1107,12 +1124,27 @@ public class TxnHandler { switch (rs.getString("hl_lock_state").charAt(0)) { case LOCK_WAITING: state = LockState.WAITING; break; case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; + default: + throw new MetaException("Unknown lock state " + rs.getString("hl_lock_state").charAt(0)); } switch (rs.getString("hl_lock_type").charAt(0)) { case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; case LOCK_SHARED: type = LockType.SHARED_READ; break; case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; + default: + throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); } + txnId = rs.getLong("hl_txnid"); + } + LockInfo(ShowLocksResponseElement e, long intLockId) { + extLockId = e.getLockid(); + this.intLockId = intLockId; + db = e.getDbname(); + table = e.getTablename(); + partition = e.getPartname(); + state = e.getState(); + type = e.getType(); + txnId = e.getTxnid(); } public boolean equals(Object other) { @@ -1130,15 +1162,22 @@ public class TxnHandler { partition + " state:" + (state == null ? "null" : state.toString()) + " type:" + (type == null ? "null" : type.toString()); } + private boolean isDbLock() { + return db != null && table == null && partition == null; + } + private boolean isTableLock() { + return db != null && table != null && partition == null; + } } private static class LockInfoComparator implements Comparator<LockInfo> { + private static final LockTypeComparator lockTypeComparator = new LockTypeComparator(); public boolean equals(Object other) { return this == other; } public int compare(LockInfo info1, LockInfo info2) { - // We sort by state (acquired vs waiting) and then by extLockId. + // We sort by state (acquired vs waiting) and then by LockType, they by id if (info1.state == LockState.ACQUIRED && info2.state != LockState .ACQUIRED) { return -1; @@ -1147,6 +1186,11 @@ public class TxnHandler { info2.state == LockState .ACQUIRED) { return 1; } + + int sortByType = lockTypeComparator.compare(info1.type, info2.type); + if(sortByType != 0) { + return sortByType; + } if (info1.extLockId < info2.extLockId) { return -1; } else if (info1.extLockId > info2.extLockId) { @@ -1163,6 +1207,41 @@ public class TxnHandler { } } + /** + * Sort more restrictive locks after less restrictive ones + */ + private final static class LockTypeComparator implements Comparator<LockType> { + public boolean equals(Object other) { + return this == other; + } + public int compare(LockType t1, LockType t2) { + switch (t1) { + case EXCLUSIVE: + if(t2 == LockType.EXCLUSIVE) { + return 0; + } + return 1; + case SHARED_WRITE: + switch (t2) { + case EXCLUSIVE: + return -1; + case SHARED_WRITE: + return 0; + case SHARED_READ: + return 1; + default: + throw new RuntimeException("Unexpected LockType: " + t2); + } + case SHARED_READ: + if(t2 == LockType.SHARED_READ) { + return 0; + } + return -1; + default: + throw new RuntimeException("Unexpected LockType: " + t1); + } + } + } private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING} // A jump table to figure out whether to wait, acquire, @@ -1362,11 +1441,11 @@ public class TxnHandler { LockResponse response = new LockResponse(); response.setLockid(extLockId); - LOG.debug("Setting savepoint"); + LOG.debug("checkLock(): Setting savepoint. extLockId=" + extLockId); Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); Set<String> strings = new HashSet<String>(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1431,19 +1510,26 @@ public class TxnHandler { query.append("))"); } } + query.append(" and hl_lock_ext_id <= ").append(extLockId); LOG.debug("Going to execute query <" + query.toString() + ">"); Statement stmt = null; try { stmt = dbConn.createStatement(); ResultSet rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); + SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator()); while (rs.next()) { lockSet.add(new LockInfo(rs)); } // Turn the tree set into an array so we can move back and forth easily // in it. - LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); + LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); + if(LOG.isDebugEnabled()) { + LOG.debug("Locks to check(full): "); + for(LockInfo info : locks) { + LOG.debug(" " + info); + } + } for (LockInfo info : locksBeingChecked) { // Find the lock record we're checking @@ -1496,22 +1582,27 @@ public class TxnHandler { // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. - switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { + LockAction lockAction = jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state); + LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); + switch (lockAction) { + case WAIT: + if(!ignoreConflict(info, locks[i])) { + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); + return response; + } + //fall through to ACQUIRE case ACQUIRE: acquire(dbConn, stmt, extLockId, info.intLockId); acquired = true; break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; case KEEP_LOOKING: continue; } @@ -1534,6 +1625,19 @@ public class TxnHandler { return response; } + /** + * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not + * sufficient. For example, an EXCLUSIVE lock on partition should prevent SHARED_READ + * on the table, but there is no reason for EXCLUSIVE on a table to prevent SHARED_READ + * on a database. + */ + private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) { + return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && + existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || + (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && + desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE); + } + private void wait(Connection dbConn, Savepoint save) throws SQLException { // Need to rollback because we did a select that acquired locks but we didn't // actually update anything. Also, we may have locked some locks as @@ -1654,7 +1758,7 @@ public class TxnHandler { try { stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Apr 16 17:58:38 2015 @@ -1322,7 +1322,7 @@ public class Driver implements CommandPr maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); try { - LOG.info("Starting command: " + queryStr); + LOG.info("Starting command(queryId=" + queryId + "): " + queryStr); // compile and execute can get called from different threads in case of HS2 // so clear timing in this thread's Hive object before proceeding. Hive.get().clearMetaCallTiming(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Thu Apr 16 17:58:38 2015 @@ -72,12 +72,21 @@ public class DbLockManager implements Hi * Send a lock request to the metastore. This is intended for use by * {@link DbTxnManager}. * @param lock lock request + * @param isBlocking if true, will block until locks have been acquired * @throws LockException + * @return the result of the lock attempt */ - List<HiveLock> lock(LockRequest lock) throws LockException { + LockState lock(LockRequest lock, String queryId, boolean isBlocking, List<HiveLock> acquiredLocks) throws LockException { try { - LOG.debug("Requesting lock"); + LOG.debug("Requesting: queryId=" + queryId + " " + lock); LockResponse res = client.lock(lock); + //link lockId to queryId + LOG.debug("Response " + res); + if(!isBlocking) { + if(res.getState() == LockState.WAITING) { + return LockState.WAITING; + } + } while (res.getState() == LockState.WAITING) { backoff(); res = client.checkLock(res.getLockid()); @@ -88,9 +97,8 @@ public class DbLockManager implements Hi if (res.getState() != LockState.ACQUIRED) { throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); } - List<HiveLock> locks = new ArrayList<HiveLock>(1); - locks.add(hl); - return locks; + acquiredLocks.add(hl); + return res.getState(); } catch (NoSuchTxnException e) { LOG.error("Metastore could not find txnid " + lock.getTxnid()); throw new LockException(ErrorMsg.TXNMGR_NOT_INSTANTIATED.getMsg(), e); @@ -102,6 +110,20 @@ public class DbLockManager implements Hi e); } } + /** + * Used to make another attempt to acquire a lock (in Waiting state) + * @param extLockId + * @return result of the attempt + * @throws LockException + */ + LockState checkLock(long extLockId) throws LockException { + try { + return client.checkLock(extLockId).getState(); + } catch (TException e) { + throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), + e); + } + } @Override public void unlock(HiveLock hiveLock) throws LockException { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Thu Apr 16 17:58:38 2015 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.thrift.TException; +import java.util.ArrayList; import java.util.List; /** @@ -87,6 +88,15 @@ public class DbTxnManager extends HiveTx @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { + acquireLocks(plan, ctx, username, true); + } + + /** + * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} + * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING + * @return null if no locks were needed + */ + LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { init(); // Make sure we've built the lock manager getLockManager(); @@ -94,7 +104,8 @@ public class DbTxnManager extends HiveTx boolean atLeastOneLock = false; LockRequestBuilder rqstBuilder = new LockRequestBuilder(); - LOG.debug("Setting lock request transaction to " + txnId); + //link queryId to txnId + LOG.debug("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId()); rqstBuilder.setTransactionId(txnId) .setUser(username); @@ -206,10 +217,15 @@ public class DbTxnManager extends HiveTx // Make sure we need locks. It's possible there's nothing to lock in // this operation. - if (!atLeastOneLock) return; + if (!atLeastOneLock) { + LOG.debug("No locks needed for queryId" + plan.getQueryId()); + return null; + } - List<HiveLock> locks = lockMgr.lock(rqstBuilder.build()); + List<HiveLock> locks = new ArrayList<HiveLock>(1); + LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); ctx.setHiveLocks(locks); + return lockState; } @Override Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java?rev=1674119&r1=1674118&r2=1674119&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java Thu Apr 16 17:58:38 2015 @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.session import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -39,6 +40,7 @@ import java.util.*; /** * Unit tests for {@link DbTxnManager}. + * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} */ public class TestDbTxnManager {