HIVE-14799: Query operation are not thread safe during its cancellation (Chaoyu Tang, reviewed by Sergey Shelukhin, Yongzhi Chen)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1901e3a6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1901e3a6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1901e3a6 Branch: refs/heads/hive-14535 Commit: 1901e3a6ab97c150905c04c591b33b2c640e4b87 Parents: c71ef4f Author: ctang <ct...@cloudera.com> Authored: Sat Oct 15 08:55:36 2016 -0400 Committer: ctang <ct...@cloudera.com> Committed: Sat Oct 15 08:55:36 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 665 +++++++++++++------ 1 file changed, 468 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1901e3a6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index dd55434..9e5fd37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -164,8 +164,6 @@ public class Driver implements CommandProcessor { private int maxthreads; private int tryCount = Integer.MAX_VALUE; - private boolean destroyed; - private String userName; // HS2 operation handle guid string @@ -180,6 +178,28 @@ public class Driver implements CommandProcessor { // Query hooks that execute before compilation and after execution List<QueryLifeTimeHook> queryHooks; + // a lock is used for synchronizing the state transition and its associated + // resource releases + private final ReentrantLock stateLock = new ReentrantLock(); + private DriverState driverState = DriverState.INITIALIZED; + + private enum DriverState { + INITIALIZED, + COMPILING, + COMPILED, + EXECUTING, + EXECUTED, + // a state that the driver enters after close() has been called to interrupt its running + // query in the query cancellation + INTERRUPT, + // a state that the driver enters after close() has been called to clean the query results + // and release the resources after the query has been executed + CLOSED, + // a state that the driver enters after destroy() is called and it is the end of driver life cycle + DESTROYED, + ERROR + } + private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -350,9 +370,22 @@ public class Driver implements CommandProcessor { * @return 0 for ok */ public int compile(String command, boolean resetTaskIds) { + return compile(command, resetTaskIds, false); + } + + // deferClose indicates if the close/destroy should be deferred when the process has been + // interrupted, it should be set to true if the compile is called within another method like + // runInternal, which defers the close to the called in that method. + public int compile(String command, boolean resetTaskIds, boolean deferClose) { PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); + stateLock.lock(); + try { + driverState = DriverState.COMPILING; + } finally { + stateLock.unlock(); + } command = new VariableSubstitution(new HiveVariableSource() { @Override @@ -370,8 +403,13 @@ public class Driver implements CommandProcessor { LOG.warn("WARNING! Query command could not be redacted." + e); } + if (isInterrupted()) { + return handleInterruption("at beginning of compilation."); //indicate if need clean resource + } + if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) { - close(); + // close the existing ctx etc before compiling a new query, but does not destroy driver + closeInProcess(false); } if (resetTaskIds) { @@ -411,9 +449,13 @@ public class Driver implements CommandProcessor { }; ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY); + if (isInterrupted()) { + return handleInterruption("before parsing and analysing the query"); + } if (ctx == null) { ctx = new Context(conf); } + ctx.setTryCount(getTryCount()); ctx.setCmd(command); ctx.setHDFSCleanup(true); @@ -477,9 +519,12 @@ public class Driver implements CommandProcessor { acidInQuery = sem.hasAcidInQuery(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); + if (isInterrupted()) { + return handleInterruption("after analyzing query."); + } + // get the output schema schema = getSchema(sem, conf); - plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema); @@ -523,11 +568,13 @@ public class Driver implements CommandProcessor { } } } - return 0; } catch (Exception e) { - compileError = true; + if (isInterrupted()) { + return handleInterruption("during query compilation: " + e.getMessage()); + } + compileError = true; ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); errorMessage = "FAILED: " + e.getClass().getSimpleName(); if (error != ErrorMsg.GENERIC_ERROR) { @@ -552,23 +599,66 @@ public class Driver implements CommandProcessor { return error.getErrorCode();//todo: this is bad if returned as cmd shell exit // since it exceeds valid range of shell return values } finally { - // Trigger post compilation hook. Note that if the compilation fails here then // before/after execution hook will never be executed. - if (queryHooks != null && !queryHooks.isEmpty()) { - QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl(); - qhc.setHiveConf(conf); - qhc.setCommand(command); - - for (QueryLifeTimeHook hook : queryHooks) { - hook.afterCompile(qhc, compileError); + try { + if (queryHooks != null && !queryHooks.isEmpty()) { + QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl(); + qhc.setHiveConf(conf); + qhc.setCommand(command); + for (QueryLifeTimeHook hook : queryHooks) { + hook.afterCompile(qhc, compileError); + } } + } catch (Exception e) { + LOG.warn("Failed when invoking query after-compilation hook.", e); } double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00; ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation"); queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); - LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + + boolean isInterrupted = isInterrupted(); + if (isInterrupted && !deferClose) { + closeInProcess(true); + } + stateLock.lock(); + try { + if (isInterrupted) { + driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; + } else { + driverState = compileError ? DriverState.ERROR : DriverState.COMPILED; + } + } finally { + stateLock.unlock(); + } + + if (isInterrupted) { + LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); + } else { + LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + } + } + } + + private int handleInterruption(String msg) { + SQLState = "HY008"; //SQLState for cancel operation + errorMessage = "FAILED: command has been interrupted: " + msg; + console.printError(errorMessage); + return 1000; + } + + private boolean isInterrupted() { + stateLock.lock(); + try { + if (driverState == DriverState.INTERRUPT) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } finally { + stateLock.unlock(); } } @@ -1040,7 +1130,7 @@ public class Driver implements CommandProcessor { } return 0; - } catch (LockException e) { + } catch (Exception e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); downstreamError = e; @@ -1106,23 +1196,11 @@ public class Driver implements CommandProcessor { * while keeping the result around. */ private void releaseResources() { + releasePlan(); + releaseDriverContext(); if (SessionState.get() != null) { SessionState.get().getLineageState().clear(); } - - if (plan != null) { - fetchTask = plan.getFetchTask(); - if (fetchTask != null) { - fetchTask.setDriverContext(null); - fetchTask.setQueryPlan(null); - } - } - - if (driverCxt != null) { - driverCxt.shutdown(); - driverCxt = null; - } - plan = null; } @Override @@ -1138,12 +1216,7 @@ public class Driver implements CommandProcessor { public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { - CommandProcessorResponse cpr; - try { - cpr = runInternal(command, alreadyCompiled); - } finally { - releaseResources(); - } + CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); if(cpr.getResponseCode() == 0) { return cpr; @@ -1201,11 +1274,11 @@ public class Driver implements CommandProcessor { } public CommandProcessorResponse compileAndRespond(String command) { - return createProcessorResponse(compileInternal(command)); + return createProcessorResponse(compileInternal(command, false)); } private static final ReentrantLock globalCompileLock = new ReentrantLock(); - private int compileInternal(String command) { + private int compileInternal(String command, boolean deferClose) { int ret; Metrics metrics = MetricsFactory.getInstance(); @@ -1223,7 +1296,7 @@ public class Driver implements CommandProcessor { if (metrics != null) { metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1); } - ret = compile(command); + ret = compile(command, true, deferClose); } finally { compileLock.unlock(); } @@ -1318,135 +1391,178 @@ public class Driver implements CommandProcessor { errorMessage = null; SQLState = null; downstreamError = null; - - HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, - alreadyCompiled ? ctx.getCmd() : command); - // Get all the driver run hooks and pre-execute them. - List<HiveDriverRunHook> driverRunHooks; + stateLock.lock(); try { - driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, - HiveDriverRunHook.class); - for (HiveDriverRunHook driverRunHook : driverRunHooks) { - driverRunHook.preDriverRun(hookContext); + if (alreadyCompiled) { + if (driverState == DriverState.COMPILED) { + driverState = DriverState.EXECUTING; + } else { + errorMessage = "FAILED: Precompiled query has been cancelled or closed."; + console.printError(errorMessage); + return createProcessorResponse(12); + } + } else { + driverState = DriverState.COMPILING; } - } catch (Exception e) { - errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); - SQLState = ErrorMsg.findSQLState(e.getMessage()); - downstreamError = e; - console.printError(errorMessage + "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(12); + } finally { + stateLock.unlock(); } - PerfLogger perfLogger = null; - - int ret; - if (!alreadyCompiled) { - // compile internal will automatically reset the perf logger - ret = compileInternal(command); - // then we continue to use this perf logger - perfLogger = SessionState.getPerfLogger(); - if (ret != 0) { - return createProcessorResponse(ret); + // a flag that helps to set the correct driver state in finally block by tracking if + // the method has been returned by an error or not. + boolean isFinishedWithError = true; + try { + HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, + alreadyCompiled ? ctx.getCmd() : command); + // Get all the driver run hooks and pre-execute them. + List<HiveDriverRunHook> driverRunHooks; + try { + driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, + HiveDriverRunHook.class); + for (HiveDriverRunHook driverRunHook : driverRunHooks) { + driverRunHook.preDriverRun(hookContext); + } + } catch (Exception e) { + errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return createProcessorResponse(12); } - } else { - // reuse existing perf logger. - perfLogger = SessionState.getPerfLogger(); - // Since we're reusing the compiled plan, we need to update its start time for current run - plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN)); - } - // the reason that we set the txn manager for the cxt here is because each - // query has its own ctx object. The txn mgr is shared across the - // same instance of Driver, which can run multiple queries. - HiveTxnManager txnManager = SessionState.get().getTxnMgr(); - ctx.setHiveTxnManager(txnManager); - - boolean startTxnImplicitly = false; - { - //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open - //DDL is not allowed in a txn, etc. - //an error in an open txn does a rollback of the txn - if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) { - assert !txnManager.getAutoCommit() : "didn't expect AC=true"; - return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null, - plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId()))); - } - if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) { - return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName())); - } - if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) { - //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics - //also, indirectly allows DDL to be executed outside a txn context - startTxnImplicitly = true; - } - if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) { + + PerfLogger perfLogger = null; + + int ret; + if (!alreadyCompiled) { + // compile internal will automatically reset the perf logger + ret = compileInternal(command, true); + // then we continue to use this perf logger + perfLogger = SessionState.getPerfLogger(); + if (ret != 0) { + return createProcessorResponse(ret); + } + } else { + // reuse existing perf logger. + perfLogger = SessionState.getPerfLogger(); + // Since we're reusing the compiled plan, we need to update its start time for current run + plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN)); + } + // the reason that we set the txn manager for the cxt here is because each + // query has its own ctx object. The txn mgr is shared across the + // same instance of Driver, which can run multiple queries. + HiveTxnManager txnManager = SessionState.get().getTxnMgr(); + ctx.setHiveTxnManager(txnManager); + + boolean startTxnImplicitly = false; + { + //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open + //DDL is not allowed in a txn, etc. + //an error in an open txn does a rollback of the txn + if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) { + assert !txnManager.getAutoCommit() : "didn't expect AC=true"; + return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null, + plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId()))); + } + if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) { + return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName())); + } + if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) { + //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics + //also, indirectly allows DDL to be executed outside a txn context + startTxnImplicitly = true; + } + if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) { return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName())); + } } - } - if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) { - try { - if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { - /*here, if there is an open txn, we want to commit it; this behavior matches - * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ - releaseLocksAndCommitOrRollback(true, null); - txnManager.setAutoCommit(true); + if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) { + try { + if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { + /*here, if there is an open txn, we want to commit it; this behavior matches + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ + releaseLocksAndCommitOrRollback(true, null); + txnManager.setAutoCommit(true); + } + else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { + txnManager.setAutoCommit(false); + } + else {/*didn't change autoCommit value - no-op*/} } - else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { - txnManager.setAutoCommit(false); + catch(LockException e) { + return handleHiveException(e, 12); } - else {/*didn't change autoCommit value - no-op*/} - } - catch(LockException e) { - return handleHiveException(e, 12); } - } - if (requiresLock()) { - ret = acquireLocksAndOpenTxn(startTxnImplicitly); + if (requiresLock()) { + // a checkpoint to see if the thread is interrupted or not before an expensive operation + if (isInterrupted()) { + ret = handleInterruption("at acquiring the lock."); + } else { + ret = acquireLocksAndOpenTxn(startTxnImplicitly); + } + if (ret != 0) { + return rollback(createProcessorResponse(ret)); + } + } + ret = execute(true); if (ret != 0) { + //if needRequireLock is false, the release here will do nothing because there is no lock return rollback(createProcessorResponse(ret)); } - } - ret = execute(); - if (ret != 0) { - //if needRequireLock is false, the release here will do nothing because there is no lock - return rollback(createProcessorResponse(ret)); - } - //if needRequireLock is false, the release here will do nothing because there is no lock - try { - if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(true, null); - } - else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(false, null); - } - else { - //txn (if there is one started) is not finished + //if needRequireLock is false, the release here will do nothing because there is no lock + try { + if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { + releaseLocksAndCommitOrRollback(true, null); + } + else if(plan.getOperation() == HiveOperation.ROLLBACK) { + releaseLocksAndCommitOrRollback(false, null); + } + else { + //txn (if there is one started) is not finished + } + } catch (LockException e) { + return handleHiveException(e, 12); } - } catch (LockException e) { - return handleHiveException(e, 12); - } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN); - queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, perfLogger.getStartTimes()); - queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, perfLogger.getEndTimes()); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN); + queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, perfLogger.getStartTimes()); + queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, perfLogger.getEndTimes()); - // Take all the driver run hooks and post-execute them. - try { - for (HiveDriverRunHook driverRunHook : driverRunHooks) { - driverRunHook.postDriverRun(hookContext); + // Take all the driver run hooks and post-execute them. + try { + for (HiveDriverRunHook driverRunHook : driverRunHooks) { + driverRunHook.postDriverRun(hookContext); + } + } catch (Exception e) { + errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return createProcessorResponse(12); + } + isFinishedWithError = false; + return createProcessorResponse(ret); + } finally { + if (isInterrupted()) { + closeInProcess(true); + } else { + // only release the related resources ctx, driverContext as normal + releaseResources(); + } + stateLock.lock(); + try { + if (driverState == DriverState.INTERRUPT) { + driverState = DriverState.ERROR; + } else { + driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; + } + } finally { + stateLock.unlock(); } - } catch (Exception e) { - errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); - SQLState = ErrorMsg.findSQLState(e.getMessage()); - downstreamError = e; - console.printError(errorMessage + "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(12); } - - return createProcessorResponse(ret); } private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { @@ -1556,16 +1672,39 @@ public class Driver implements CommandProcessor { } public int execute() throws CommandNeedRetryException { + return execute(false); + } + + public int execute(boolean deferClose) throws CommandNeedRetryException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); + boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME)); int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); - String queryId = plan.getQueryId(); + String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID); // Get the query string from the conf file as the compileInternal() method might // hide sensitive information during query redaction. String queryStr = conf.getQueryString(); + stateLock.lock(); + try { + // if query is not in compiled state, or executing state which is carried over from + // a combined compile/execute in runInternal, throws the error + if (driverState != DriverState.COMPILED && + driverState != DriverState.EXECUTING) { + SQLState = "HY008"; + errorMessage = "FAILED: query " + queryStr + " has " + + (driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled."); + console.printError(errorMessage); + return 1000; + } else { + driverState = DriverState.EXECUTING; + } + } finally { + stateLock.unlock(); + } + maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); HookContext hookContext = null; @@ -1614,7 +1753,7 @@ public class Driver implements CommandProcessor { if (queryHooks != null && !queryHooks.isEmpty()) { QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl(); qhc.setHiveConf(conf); - qhc.setCommand(ctx.getCmd()); + qhc.setCommand(queryStr); qhc.setHookContext(hookContext); for (QueryLifeTimeHook hook : queryHooks) { @@ -1629,7 +1768,7 @@ public class Driver implements CommandProcessor { + Utilities.getSparkTasks(plan.getRootTasks()).size(); if (jobs > 0) { logMrWarning(mrJobs); - console.printInfo("Query ID = " + plan.getQueryId()); + console.printInfo("Query ID = " + queryId); console.printInfo("Total jobs = " + jobs); } if (SessionState.get() != null) { @@ -1645,11 +1784,13 @@ public class Driver implements CommandProcessor { // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. + if (isInterrupted()) { + return handleInterruption("before running tasks."); + } DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); ctx.setHDFSCleanup(true); - this.driverCxt = driverCxt; // for canceling the query (should be bound to session?) SessionState.get().setMapRedStats(new LinkedHashMap<String, MapRedStats>()); @@ -1671,7 +1812,7 @@ public class Driver implements CommandProcessor { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); // Loop while you either have tasks running, or tasks queued up - while (!destroyed && driverCxt.isRunning()) { + while (driverCxt.isRunning()) { // Launch upto maxthreads tasks Task<? extends Serializable> task; @@ -1694,6 +1835,9 @@ public class Driver implements CommandProcessor { TaskResult result = tskRun.getTaskResult(); int exitVal = result.getExitVal(); + if (isInterrupted()) { + return handleInterruption("when checking the execution result."); + } if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { driverCxt.shutdown(); @@ -1798,11 +1942,16 @@ public class Driver implements CommandProcessor { String.valueOf(0)); SessionState.get().getHiveHistory().printRowCount(queryId); } + releasePlan(plan); } catch (CommandNeedRetryException e) { executionError = true; throw e; } catch (Exception e) { executionError = true; + if (isInterrupted()) { + return handleInterruption("during query execution: \n" + e.getMessage()); + } + ctx.restoreOriginalTracker(); if (SessionState.get() != null) { SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, @@ -1824,15 +1973,19 @@ public class Driver implements CommandProcessor { return (12); } finally { // Trigger query hooks after query completes its execution. - if (queryHooks != null && !queryHooks.isEmpty()) { - QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl(); - qhc.setHiveConf(conf); - qhc.setCommand(ctx.getCmd()); - qhc.setHookContext(hookContext); - - for (QueryLifeTimeHook hook : queryHooks) { - hook.afterExecution(qhc, executionError); + try { + if (queryHooks != null && !queryHooks.isEmpty()) { + QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl(); + qhc.setHiveConf(conf); + qhc.setCommand(queryStr); + qhc.setHookContext(hookContext); + + for (QueryLifeTimeHook hook : queryHooks) { + hook.afterExecution(qhc, executionError); + } } + } catch (Exception e) { + LOG.warn("Failed when invoking query after execution hook", e); } if (SessionState.get() != null) { @@ -1856,11 +2009,29 @@ public class Driver implements CommandProcessor { } console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); } - LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + boolean isInterrupted = isInterrupted(); + if (isInterrupted && !deferClose) { + closeInProcess(true); + } + stateLock.lock(); + try { + if (isInterrupted) { + if (!deferClose) { + driverState = DriverState.ERROR; + } + } else { + driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; + } + } finally { + stateLock.unlock(); + } + if (isInterrupted) { + LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); + } else { + LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + } } - releasePlan(plan); - if (console != null) { console.printInfo("OK"); } @@ -1868,18 +2039,23 @@ public class Driver implements CommandProcessor { return (0); } - private synchronized void releasePlan(QueryPlan plan) { + private void releasePlan(QueryPlan plan) { // Plan maybe null if Driver.close is called in another thread for the same Driver object - if (plan != null) { - plan.setDone(); - if (SessionState.get() != null) { - try { - SessionState.get().getHiveHistory().logPlanProgress(plan); - } catch (Exception e) { - // Log and ignore - LOG.warn("Could not log query plan progress", e); + stateLock.lock(); + try { + if (plan != null) { + plan.setDone(); + if (SessionState.get() != null) { + try { + SessionState.get().getHiveHistory().logPlanProgress(plan); + } catch (Exception e) { + // Log and ignore + LOG.warn("Could not log query plan progress", e); + } } } + } finally { + stateLock.unlock(); } } @@ -1996,9 +2172,10 @@ public class Driver implements CommandProcessor { @SuppressWarnings("unchecked") public boolean getResults(List res) throws IOException, CommandNeedRetryException { - if (destroyed) { - throw new IOException("FAILED: Operation cancelled"); + if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { + throw new IOException("FAILED: query has been cancelled, closed, or destroyed."); } + if (isFetchingTable()) { /** * If resultset serialization to thrift object is enabled, and if the destination table is @@ -2062,6 +2239,9 @@ public class Driver implements CommandProcessor { } public void resetFetch() throws IOException { + if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { + throw new IOException("FAILED: driver has been cancelled, closed or destroyed."); + } if (isFetchingTable()) { try { fetchTask.clearFetch(); @@ -2084,21 +2264,39 @@ public class Driver implements CommandProcessor { this.tryCount = tryCount; } - public int close() { + // DriverContext could be released in the query and close processes at same + // time, which needs to be thread protected. + private void releaseDriverContext() { + stateLock.lock(); try { - try { - releaseResources(); - } catch (Exception e) { - LOG.info("Exception while releasing resources", e); + if (driverCxt != null) { + driverCxt.shutdown(); + driverCxt = null; } - if (fetchTask != null) { - try { - fetchTask.clearFetch(); - } catch (Exception e) { - LOG.debug(" Exception while clearing the Fetch task ", e); + } catch (Exception e) { + LOG.debug("Exception while shutting down the task runner", e); + } finally { + stateLock.unlock(); + } + } + + private void releasePlan() { + try { + if (plan != null) { + fetchTask = plan.getFetchTask(); + if (fetchTask != null) { + fetchTask.setDriverContext(null); + fetchTask.setQueryPlan(null); } - fetchTask = null; } + plan = null; + } catch (Exception e) { + LOG.debug("Exception while clearing the Fetch task", e); + } + } + + private void releaseContext() { + try { if (ctx != null) { ctx.clear(); if (ctx.getHiveLocks() != null) { @@ -2107,27 +2305,99 @@ public class Driver implements CommandProcessor { } ctx = null; } - if (null != resStream) { + } catch (Exception e) { + LOG.debug("Exception while clearing the context ", e); + } + } + + private void releaseResStream() { + try { + if (resStream != null) { + ((FSDataInputStream) resStream).close(); + resStream = null; + } + } catch (Exception e) { + LOG.debug(" Exception while closing the resStream ", e); + } + } + + private void releaseFetchTask() { + try { + if (fetchTask != null) { + fetchTask.clearFetch(); + fetchTask = null; + } + } catch (Exception e) { + LOG.debug(" Exception while clearing the FetchTask ", e); + } + } + // Close and release resources within a running query process. Since it runs under + // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition + // with the releases probably running in the other closing thread. + private int closeInProcess(boolean destroyed) { + releaseDriverContext(); + releasePlan(); + releaseFetchTask(); + releaseResStream(); + releaseContext(); + if (SessionState.get() != null) { + SessionState.get().getLineageState().clear(); + } + if(destroyed) { + if (!hiveLocks.isEmpty()) { try { - ((FSDataInputStream) resStream).close(); - } catch (Exception e) { - LOG.debug(" Exception while closing the resStream ", e); + releaseLocksAndCommitOrRollback(false, null); + } catch (LockException e) { + LOG.warn("Exception when releasing locking in destroy: " + + e.getMessage()); } } - } catch (Exception e) { - console.printError("FAILED: Hive Internal Error: " + Utilities.getNameMessage(e) + "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return 13; + ShutdownHookManager.removeShutdownHook(shutdownRunner); } + return 0; + } + // is called to stop the query if it is running, clean query results, and release resources. + public int close() { + stateLock.lock(); + try { + releaseDriverContext(); + if (driverState == DriverState.COMPILING || + driverState == DriverState.EXECUTING || + driverState == DriverState.INTERRUPT) { + driverState = DriverState.INTERRUPT; + return 0; + } + releasePlan(); + releaseFetchTask(); + releaseResStream(); + releaseContext(); + driverState = DriverState.CLOSED; + } finally { + stateLock.unlock(); + } + if (SessionState.get() != null) { + SessionState.get().getLineageState().clear(); + } return 0; } + // is usually called after close() to commit or rollback a query and end the driver life cycle. + // do not understand why it is needed and wonder if it could be combined with close. public void destroy() { - if (destroyed) { - return; + stateLock.lock(); + try { + // in the cancel case where the driver state is INTERRUPTED, destroy will be deferred to + // the query process + if (driverState == DriverState.DESTROYED || + driverState == DriverState.INTERRUPT) { + return; + } else { + driverState = DriverState.DESTROYED; + } + } finally { + stateLock.unlock(); } - destroyed = true; if (!hiveLocks.isEmpty()) { try { releaseLocksAndCommitOrRollback(false, null); @@ -2139,6 +2409,7 @@ public class Driver implements CommandProcessor { ShutdownHookManager.removeShutdownHook(shutdownRunner); } + public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { return plan.getQueryPlan(); }