HIVE-14799: Query operation are not thread safe during its cancellation (Chaoyu 
Tang, reviewed by Sergey Shelukhin, Yongzhi Chen)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1901e3a6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1901e3a6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1901e3a6

Branch: refs/heads/hive-14535
Commit: 1901e3a6ab97c150905c04c591b33b2c640e4b87
Parents: c71ef4f
Author: ctang <ct...@cloudera.com>
Authored: Sat Oct 15 08:55:36 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Sat Oct 15 08:55:36 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 665 +++++++++++++------
 1 file changed, 468 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1901e3a6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index dd55434..9e5fd37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -164,8 +164,6 @@ public class Driver implements CommandProcessor {
   private int maxthreads;
   private int tryCount = Integer.MAX_VALUE;
 
-  private boolean destroyed;
-
   private String userName;
 
   // HS2 operation handle guid string
@@ -180,6 +178,28 @@ public class Driver implements CommandProcessor {
   // Query hooks that execute before compilation and after execution
   List<QueryLifeTimeHook> queryHooks;
 
+  // a lock is used for synchronizing the state transition and its associated
+  // resource releases
+  private final ReentrantLock stateLock = new ReentrantLock();
+  private DriverState driverState = DriverState.INITIALIZED;
+
+  private enum DriverState {
+    INITIALIZED,
+    COMPILING,
+    COMPILED,
+    EXECUTING,
+    EXECUTED,
+    // a state that the driver enters after close() has been called to 
interrupt its running
+    // query in the query cancellation
+    INTERRUPT,
+    // a state that the driver enters after close() has been called to clean 
the query results
+    // and release the resources after the query has been executed
+    CLOSED,
+    // a state that the driver enters after destroy() is called and it is the 
end of driver life cycle
+    DESTROYED,
+    ERROR
+  }
+
   private boolean checkConcurrency() {
     boolean supportConcurrency = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
     if (!supportConcurrency) {
@@ -350,9 +370,22 @@ public class Driver implements CommandProcessor {
    * @return 0 for ok
    */
   public int compile(String command, boolean resetTaskIds) {
+    return compile(command, resetTaskIds, false);
+  }
+
+  // deferClose indicates if the close/destroy should be deferred when the 
process has been
+  // interrupted, it should be set to true if the compile is called within 
another method like
+  // runInternal, which defers the close to the called in that method.
+  public int compile(String command, boolean resetTaskIds, boolean deferClose) 
{
     PerfLogger perfLogger = SessionState.getPerfLogger(true);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
+    stateLock.lock();
+    try {
+      driverState = DriverState.COMPILING;
+    } finally {
+      stateLock.unlock();
+    }
 
     command = new VariableSubstitution(new HiveVariableSource() {
       @Override
@@ -370,8 +403,13 @@ public class Driver implements CommandProcessor {
       LOG.warn("WARNING! Query command could not be redacted." + e);
     }
 
+    if (isInterrupted()) {
+      return handleInterruption("at beginning of compilation."); //indicate if 
need clean resource
+    }
+
     if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
-      close();
+      // close the existing ctx etc before compiling a new query, but does not 
destroy driver
+      closeInProcess(false);
     }
 
     if (resetTaskIds) {
@@ -411,9 +449,13 @@ public class Driver implements CommandProcessor {
       };
       ShutdownHookManager.addShutdownHook(shutdownRunner, 
SHUTDOWN_HOOK_PRIORITY);
 
+      if (isInterrupted()) {
+        return handleInterruption("before parsing and analysing the query");
+      }
       if (ctx == null) {
         ctx = new Context(conf);
       }
+
       ctx.setTryCount(getTryCount());
       ctx.setCmd(command);
       ctx.setHDFSCleanup(true);
@@ -477,9 +519,12 @@ public class Driver implements CommandProcessor {
       acidInQuery = sem.hasAcidInQuery();
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
 
+      if (isInterrupted()) {
+        return handleInterruption("after analyzing query.");
+      }
+
       // get the output schema
       schema = getSchema(sem, conf);
-
       plan = new QueryPlan(queryStr, sem, 
perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
         queryState.getHiveOperation(), schema);
 
@@ -523,11 +568,13 @@ public class Driver implements CommandProcessor {
           }
         }
       }
-
       return 0;
     } catch (Exception e) {
-      compileError = true;
+      if (isInterrupted()) {
+        return handleInterruption("during query compilation: " + 
e.getMessage());
+      }
 
+      compileError = true;
       ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
       errorMessage = "FAILED: " + e.getClass().getSimpleName();
       if (error != ErrorMsg.GENERIC_ERROR) {
@@ -552,23 +599,66 @@ public class Driver implements CommandProcessor {
       return error.getErrorCode();//todo: this is bad if returned as cmd shell 
exit
       // since it exceeds valid range of shell return values
     } finally {
-
       // Trigger post compilation hook. Note that if the compilation fails 
here then
       // before/after execution hook will never be executed.
-      if (queryHooks != null && !queryHooks.isEmpty()) {
-        QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
-        qhc.setHiveConf(conf);
-        qhc.setCommand(command);
-
-        for (QueryLifeTimeHook hook : queryHooks) {
-          hook.afterCompile(qhc, compileError);
+      try {
+        if (queryHooks != null && !queryHooks.isEmpty()) {
+          QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
+          qhc.setHiveConf(conf);
+          qhc.setCommand(command);
+          for (QueryLifeTimeHook hook : queryHooks) {
+            hook.afterCompile(qhc, compileError);
+          }
         }
+      } catch (Exception e) {
+        LOG.warn("Failed when invoking query after-compilation hook.", e);
       }
 
       double duration = perfLogger.PerfLogEnd(CLASS_NAME, 
PerfLogger.COMPILE)/1000.00;
       ImmutableMap<String, Long> compileHMSTimings = 
dumpMetaCallTimingWithoutEx("compilation");
       queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, 
compileHMSTimings);
-      LOG.info("Completed compiling command(queryId=" + queryId + "); Time 
taken: " + duration + " seconds");
+
+      boolean isInterrupted = isInterrupted();
+      if (isInterrupted && !deferClose) {
+        closeInProcess(true);
+      }
+      stateLock.lock();
+      try {
+        if (isInterrupted) {
+          driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;
+        } else {
+          driverState = compileError ? DriverState.ERROR : 
DriverState.COMPILED;
+        }
+      } finally {
+        stateLock.unlock();
+      }
+
+      if (isInterrupted) {
+        LOG.info("Compiling command(queryId=" + queryId + ") has been 
interrupted after " + duration + " seconds");
+      } else {
+        LOG.info("Completed compiling command(queryId=" + queryId + "); Time 
taken: " + duration + " seconds");
+      }
+    }
+  }
+
+  private int handleInterruption(String msg) {
+    SQLState = "HY008";  //SQLState for cancel operation
+    errorMessage = "FAILED: command has been interrupted: " + msg;
+    console.printError(errorMessage);
+    return 1000;
+  }
+
+  private boolean isInterrupted() {
+    stateLock.lock();
+    try {
+      if (driverState == DriverState.INTERRUPT) {
+        Thread.currentThread().interrupt();
+        return true;
+      } else {
+        return false;
+      }
+    } finally {
+      stateLock.unlock();
     }
   }
 
@@ -1040,7 +1130,7 @@ public class Driver implements CommandProcessor {
       }
 
       return 0;
-    } catch (LockException e) {
+    } catch (Exception e) {
       errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
       SQLState = ErrorMsg.findSQLState(e.getMessage());
       downstreamError = e;
@@ -1106,23 +1196,11 @@ public class Driver implements CommandProcessor {
    * while keeping the result around.
    */
   private void releaseResources() {
+    releasePlan();
+    releaseDriverContext();
     if (SessionState.get() != null) {
       SessionState.get().getLineageState().clear();
     }
-
-    if (plan != null) {
-      fetchTask = plan.getFetchTask();
-      if (fetchTask != null) {
-        fetchTask.setDriverContext(null);
-        fetchTask.setQueryPlan(null);
-      }
-    }
-
-    if (driverCxt != null) {
-      driverCxt.shutdown();
-      driverCxt = null;
-    }
-    plan = null;
   }
 
   @Override
@@ -1138,12 +1216,7 @@ public class Driver implements CommandProcessor {
 
   public CommandProcessorResponse run(String command, boolean alreadyCompiled)
         throws CommandNeedRetryException {
-    CommandProcessorResponse cpr;
-    try {
-      cpr = runInternal(command, alreadyCompiled);
-    } finally {
-      releaseResources();
-    }
+    CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
 
     if(cpr.getResponseCode() == 0) {
       return cpr;
@@ -1201,11 +1274,11 @@ public class Driver implements CommandProcessor {
   }
 
   public CommandProcessorResponse compileAndRespond(String command) {
-    return createProcessorResponse(compileInternal(command));
+    return createProcessorResponse(compileInternal(command, false));
   }
 
   private static final ReentrantLock globalCompileLock = new ReentrantLock();
-  private int compileInternal(String command) {
+  private int compileInternal(String command, boolean deferClose) {
     int ret;
 
     Metrics metrics = MetricsFactory.getInstance();
@@ -1223,7 +1296,7 @@ public class Driver implements CommandProcessor {
       if (metrics != null) {
         metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
       }
-      ret = compile(command);
+      ret = compile(command, true, deferClose);
     } finally {
       compileLock.unlock();
     }
@@ -1318,135 +1391,178 @@ public class Driver implements CommandProcessor {
     errorMessage = null;
     SQLState = null;
     downstreamError = null;
-
-    HiveDriverRunHookContext hookContext = new 
HiveDriverRunHookContextImpl(conf,
-        alreadyCompiled ? ctx.getCmd() : command);
-    // Get all the driver run hooks and pre-execute them.
-    List<HiveDriverRunHook> driverRunHooks;
+    stateLock.lock();
     try {
-      driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
-          HiveDriverRunHook.class);
-      for (HiveDriverRunHook driverRunHook : driverRunHooks) {
-          driverRunHook.preDriverRun(hookContext);
+      if (alreadyCompiled) {
+        if (driverState == DriverState.COMPILED) {
+          driverState = DriverState.EXECUTING;
+        } else {
+          errorMessage = "FAILED: Precompiled query has been cancelled or 
closed.";
+          console.printError(errorMessage);
+          return createProcessorResponse(12);
+        }
+      } else {
+        driverState = DriverState.COMPILING;
       }
-    } catch (Exception e) {
-      errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
-      SQLState = ErrorMsg.findSQLState(e.getMessage());
-      downstreamError = e;
-      console.printError(errorMessage + "\n"
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
-      return createProcessorResponse(12);
+    } finally {
+      stateLock.unlock();
     }
 
-    PerfLogger perfLogger = null;
-
-    int ret;
-    if (!alreadyCompiled) {
-      // compile internal will automatically reset the perf logger
-      ret = compileInternal(command);
-      // then we continue to use this perf logger
-      perfLogger = SessionState.getPerfLogger();
-      if (ret != 0) {
-        return createProcessorResponse(ret);
+    // a flag that helps to set the correct driver state in finally block by 
tracking if
+    // the method has been returned by an error or not.
+    boolean isFinishedWithError = true;
+    try {
+      HiveDriverRunHookContext hookContext = new 
HiveDriverRunHookContextImpl(conf,
+          alreadyCompiled ? ctx.getCmd() : command);
+      // Get all the driver run hooks and pre-execute them.
+      List<HiveDriverRunHook> driverRunHooks;
+      try {
+        driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
+            HiveDriverRunHook.class);
+        for (HiveDriverRunHook driverRunHook : driverRunHooks) {
+            driverRunHook.preDriverRun(hookContext);
+        }
+      } catch (Exception e) {
+        errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
+        SQLState = ErrorMsg.findSQLState(e.getMessage());
+        downstreamError = e;
+        console.printError(errorMessage + "\n"
+            + org.apache.hadoop.util.StringUtils.stringifyException(e));
+        return createProcessorResponse(12);
       }
-    } else {
-      // reuse existing perf logger.
-      perfLogger = SessionState.getPerfLogger();
-      // Since we're reusing the compiled plan, we need to update its start 
time for current run
-      plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
-    }
-    // the reason that we set the txn manager for the cxt here is because each
-    // query has its own ctx object. The txn mgr is shared across the
-    // same instance of Driver, which can run multiple queries.
-    HiveTxnManager txnManager = SessionState.get().getTxnMgr();
-    ctx.setHiveTxnManager(txnManager);
-
-    boolean startTxnImplicitly = false;
-    {
-      //this block ensures op makes sense in given context, e.g. COMMIT is 
valid only if txn is open
-      //DDL is not allowed in a txn, etc.
-      //an error in an open txn does a rollback of the txn
-      if (txnManager.isTxnOpen() && 
!plan.getOperation().isAllowedInTransaction()) {
-        assert !txnManager.getAutoCommit() : "didn't expect AC=true";
-        return rollback(new CommandProcessorResponse(12, 
ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
-          plan.getOperationName(), 
Long.toString(txnManager.getCurrentTxnId())));
-      }
-      if(!txnManager.isTxnOpen() && 
plan.getOperation().isRequiresOpenTransaction()) {
-        return rollback(new CommandProcessorResponse(12, 
ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
-      }
-      if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY 
&& !txnManager.getAutoCommit()) {
-        //this effectively makes START TRANSACTION optional and supports JDBC 
setAutoCommit(false) semantics
-        //also, indirectly allows DDL to be executed outside a txn context
-        startTxnImplicitly = true;
-      }
-      if(txnManager.getAutoCommit() && plan.getOperation() == 
HiveOperation.START_TRANSACTION) {
+
+      PerfLogger perfLogger = null;
+
+      int ret;
+      if (!alreadyCompiled) {
+        // compile internal will automatically reset the perf logger
+        ret = compileInternal(command, true);
+        // then we continue to use this perf logger
+        perfLogger = SessionState.getPerfLogger();
+        if (ret != 0) {
+          return createProcessorResponse(ret);
+        }
+      } else {
+        // reuse existing perf logger.
+        perfLogger = SessionState.getPerfLogger();
+        // Since we're reusing the compiled plan, we need to update its start 
time for current run
+        plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
+      }
+      // the reason that we set the txn manager for the cxt here is because 
each
+      // query has its own ctx object. The txn mgr is shared across the
+      // same instance of Driver, which can run multiple queries.
+      HiveTxnManager txnManager = SessionState.get().getTxnMgr();
+      ctx.setHiveTxnManager(txnManager);
+
+      boolean startTxnImplicitly = false;
+      {
+        //this block ensures op makes sense in given context, e.g. COMMIT is 
valid only if txn is open
+        //DDL is not allowed in a txn, etc.
+        //an error in an open txn does a rollback of the txn
+        if (txnManager.isTxnOpen() && 
!plan.getOperation().isAllowedInTransaction()) {
+          assert !txnManager.getAutoCommit() : "didn't expect AC=true";
+          return rollback(new CommandProcessorResponse(12, 
ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
+            plan.getOperationName(), 
Long.toString(txnManager.getCurrentTxnId())));
+        }
+        if(!txnManager.isTxnOpen() && 
plan.getOperation().isRequiresOpenTransaction()) {
+          return rollback(new CommandProcessorResponse(12, 
ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
+        }
+        if(!txnManager.isTxnOpen() && plan.getOperation() == 
HiveOperation.QUERY && !txnManager.getAutoCommit()) {
+          //this effectively makes START TRANSACTION optional and supports 
JDBC setAutoCommit(false) semantics
+          //also, indirectly allows DDL to be executed outside a txn context
+          startTxnImplicitly = true;
+        }
+        if(txnManager.getAutoCommit() && plan.getOperation() == 
HiveOperation.START_TRANSACTION) {
           return rollback(new CommandProcessorResponse(12, 
ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
+        }
       }
-    }
-    if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
-      try {
-        if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
-          /*here, if there is an open txn, we want to commit it; this behavior 
matches
-          * 
https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
-          releaseLocksAndCommitOrRollback(true, null);
-          txnManager.setAutoCommit(true);
+      if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
+        try {
+          if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
+            /*here, if there is an open txn, we want to commit it; this 
behavior matches
+            * 
https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
+            releaseLocksAndCommitOrRollback(true, null);
+            txnManager.setAutoCommit(true);
+          }
+          else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
+            txnManager.setAutoCommit(false);
+          }
+          else {/*didn't change autoCommit value - no-op*/}
         }
-        else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
-          txnManager.setAutoCommit(false);
+        catch(LockException e) {
+          return handleHiveException(e, 12);
         }
-        else {/*didn't change autoCommit value - no-op*/}
-      }
-      catch(LockException e) {
-        return handleHiveException(e, 12);
       }
-    }
 
-    if (requiresLock()) {
-      ret = acquireLocksAndOpenTxn(startTxnImplicitly);
+      if (requiresLock()) {
+        // a checkpoint to see if the thread is interrupted or not before an 
expensive operation
+        if (isInterrupted()) {
+          ret = handleInterruption("at acquiring the lock.");
+        } else {
+          ret = acquireLocksAndOpenTxn(startTxnImplicitly);
+        }
+        if (ret != 0) {
+          return rollback(createProcessorResponse(ret));
+        }
+      }
+      ret = execute(true);
       if (ret != 0) {
+        //if needRequireLock is false, the release here will do nothing 
because there is no lock
         return rollback(createProcessorResponse(ret));
       }
-    }
-    ret = execute();
-    if (ret != 0) {
-      //if needRequireLock is false, the release here will do nothing because 
there is no lock
-      return rollback(createProcessorResponse(ret));
-    }
 
-    //if needRequireLock is false, the release here will do nothing because 
there is no lock
-    try {
-      if(txnManager.getAutoCommit() || plan.getOperation() == 
HiveOperation.COMMIT) {
-        releaseLocksAndCommitOrRollback(true, null);
-      }
-      else if(plan.getOperation() == HiveOperation.ROLLBACK) {
-        releaseLocksAndCommitOrRollback(false, null);
-      }
-      else {
-        //txn (if there is one started) is not finished
+      //if needRequireLock is false, the release here will do nothing because 
there is no lock
+      try {
+        if(txnManager.getAutoCommit() || plan.getOperation() == 
HiveOperation.COMMIT) {
+          releaseLocksAndCommitOrRollback(true, null);
+        }
+        else if(plan.getOperation() == HiveOperation.ROLLBACK) {
+          releaseLocksAndCommitOrRollback(false, null);
+        }
+        else {
+          //txn (if there is one started) is not finished
+        }
+      } catch (LockException e) {
+        return handleHiveException(e, 12);
       }
-    } catch (LockException e) {
-      return handleHiveException(e, 12);
-    }
 
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
-    queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, 
perfLogger.getStartTimes());
-    queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, 
perfLogger.getEndTimes());
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
+      queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, 
perfLogger.getStartTimes());
+      queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, 
perfLogger.getEndTimes());
 
-    // Take all the driver run hooks and post-execute them.
-    try {
-      for (HiveDriverRunHook driverRunHook : driverRunHooks) {
-          driverRunHook.postDriverRun(hookContext);
+      // Take all the driver run hooks and post-execute them.
+      try {
+        for (HiveDriverRunHook driverRunHook : driverRunHooks) {
+            driverRunHook.postDriverRun(hookContext);
+        }
+      } catch (Exception e) {
+        errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
+        SQLState = ErrorMsg.findSQLState(e.getMessage());
+        downstreamError = e;
+        console.printError(errorMessage + "\n"
+            + org.apache.hadoop.util.StringUtils.stringifyException(e));
+        return createProcessorResponse(12);
+      }
+      isFinishedWithError = false;
+      return createProcessorResponse(ret);
+    } finally {
+      if (isInterrupted()) {
+        closeInProcess(true);
+      } else {
+        // only release the related resources ctx, driverContext as normal
+        releaseResources();
+      }
+      stateLock.lock();
+      try {
+        if (driverState == DriverState.INTERRUPT) {
+          driverState = DriverState.ERROR;
+        } else {
+          driverState = isFinishedWithError ? DriverState.ERROR : 
DriverState.EXECUTED;
+        }
+      } finally {
+        stateLock.unlock();
       }
-    } catch (Exception e) {
-      errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
-      SQLState = ErrorMsg.findSQLState(e.getMessage());
-      downstreamError = e;
-      console.printError(errorMessage + "\n"
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
-      return createProcessorResponse(12);
     }
-
-    return createProcessorResponse(ret);
   }
 
   private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
@@ -1556,16 +1672,39 @@ public class Driver implements CommandProcessor {
   }
 
   public int execute() throws CommandNeedRetryException {
+    return execute(false);
+  }
+
+  public int execute(boolean deferClose) throws CommandNeedRetryException {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
+
     boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME));
     int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
 
-    String queryId = plan.getQueryId();
+    String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
     // Get the query string from the conf file as the compileInternal() method 
might
     // hide sensitive information during query redaction.
     String queryStr = conf.getQueryString();
 
+    stateLock.lock();
+    try {
+      // if query is not in compiled state, or executing state which is 
carried over from
+      // a combined compile/execute in runInternal, throws the error
+      if (driverState != DriverState.COMPILED &&
+          driverState != DriverState.EXECUTING) {
+        SQLState = "HY008";
+        errorMessage = "FAILED: query " + queryStr + " has " +
+            (driverState == DriverState.INTERRUPT ? "been cancelled" : "not 
been compiled.");
+        console.printError(errorMessage);
+        return 1000;
+      } else {
+        driverState = DriverState.EXECUTING;
+      }
+    } finally {
+      stateLock.unlock();
+    }
+
     maxthreads = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
 
     HookContext hookContext = null;
@@ -1614,7 +1753,7 @@ public class Driver implements CommandProcessor {
       if (queryHooks != null && !queryHooks.isEmpty()) {
         QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
         qhc.setHiveConf(conf);
-        qhc.setCommand(ctx.getCmd());
+        qhc.setCommand(queryStr);
         qhc.setHookContext(hookContext);
 
         for (QueryLifeTimeHook hook : queryHooks) {
@@ -1629,7 +1768,7 @@ public class Driver implements CommandProcessor {
         + Utilities.getSparkTasks(plan.getRootTasks()).size();
       if (jobs > 0) {
         logMrWarning(mrJobs);
-        console.printInfo("Query ID = " + plan.getQueryId());
+        console.printInfo("Query ID = " + queryId);
         console.printInfo("Total jobs = " + jobs);
       }
       if (SessionState.get() != null) {
@@ -1645,11 +1784,13 @@ public class Driver implements CommandProcessor {
       // At any time, at most maxthreads tasks can be running
       // The main thread polls the TaskRunners to check if they have finished.
 
+      if (isInterrupted()) {
+        return handleInterruption("before running tasks.");
+      }
       DriverContext driverCxt = new DriverContext(ctx);
       driverCxt.prepare(plan);
 
       ctx.setHDFSCleanup(true);
-
       this.driverCxt = driverCxt; // for canceling the query (should be bound 
to session?)
 
       SessionState.get().setMapRedStats(new LinkedHashMap<String, 
MapRedStats>());
@@ -1671,7 +1812,7 @@ public class Driver implements CommandProcessor {
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
       // Loop while you either have tasks running, or tasks queued up
-      while (!destroyed && driverCxt.isRunning()) {
+      while (driverCxt.isRunning()) {
 
         // Launch upto maxthreads tasks
         Task<? extends Serializable> task;
@@ -1694,6 +1835,9 @@ public class Driver implements CommandProcessor {
         TaskResult result = tskRun.getTaskResult();
 
         int exitVal = result.getExitVal();
+        if (isInterrupted()) {
+          return handleInterruption("when checking the execution result.");
+        }
         if (exitVal != 0) {
           if (tsk.ifRetryCmdWhenFail()) {
             driverCxt.shutdown();
@@ -1798,11 +1942,16 @@ public class Driver implements CommandProcessor {
             String.valueOf(0));
         SessionState.get().getHiveHistory().printRowCount(queryId);
       }
+      releasePlan(plan);
     } catch (CommandNeedRetryException e) {
       executionError = true;
       throw e;
     } catch (Exception e) {
       executionError = true;
+      if (isInterrupted()) {
+        return handleInterruption("during query execution: \n" + 
e.getMessage());
+      }
+
       ctx.restoreOriginalTracker();
       if (SessionState.get() != null) {
         SessionState.get().getHiveHistory().setQueryProperty(queryId, 
Keys.QUERY_RET_CODE,
@@ -1824,15 +1973,19 @@ public class Driver implements CommandProcessor {
       return (12);
     } finally {
       // Trigger query hooks after query completes its execution.
-      if (queryHooks != null && !queryHooks.isEmpty()) {
-        QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
-        qhc.setHiveConf(conf);
-        qhc.setCommand(ctx.getCmd());
-        qhc.setHookContext(hookContext);
-
-        for (QueryLifeTimeHook hook : queryHooks) {
-          hook.afterExecution(qhc, executionError);
+      try {
+        if (queryHooks != null && !queryHooks.isEmpty()) {
+          QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();
+          qhc.setHiveConf(conf);
+          qhc.setCommand(queryStr);
+          qhc.setHookContext(hookContext);
+
+          for (QueryLifeTimeHook hook : queryHooks) {
+            hook.afterExecution(qhc, executionError);
+          }
         }
+      } catch (Exception e) {
+        LOG.warn("Failed when invoking query after execution hook", e);
       }
 
       if (SessionState.get() != null) {
@@ -1856,11 +2009,29 @@ public class Driver implements CommandProcessor {
         }
         console.printInfo("Total MapReduce CPU Time Spent: " + 
Utilities.formatMsecToStr(totalCpu));
       }
-      LOG.info("Completed executing command(queryId=" + queryId + "); Time 
taken: " + duration + " seconds");
+      boolean isInterrupted = isInterrupted();
+      if (isInterrupted && !deferClose) {
+        closeInProcess(true);
+      }
+      stateLock.lock();
+      try {
+        if (isInterrupted) {
+          if (!deferClose) {
+            driverState = DriverState.ERROR;
+          }
+        } else {
+          driverState = executionError ? DriverState.ERROR : 
DriverState.EXECUTED;
+        }
+      } finally {
+        stateLock.unlock();
+      }
+      if (isInterrupted) {
+        LOG.info("Executing command(queryId=" + queryId + ") has been 
interrupted after " + duration + " seconds");
+      } else {
+        LOG.info("Completed executing command(queryId=" + queryId + "); Time 
taken: " + duration + " seconds");
+      }
     }
 
-    releasePlan(plan);
-
     if (console != null) {
       console.printInfo("OK");
     }
@@ -1868,18 +2039,23 @@ public class Driver implements CommandProcessor {
     return (0);
   }
 
-  private synchronized void releasePlan(QueryPlan plan) {
+  private void releasePlan(QueryPlan plan) {
     // Plan maybe null if Driver.close is called in another thread for the 
same Driver object
-    if (plan != null) {
-      plan.setDone();
-      if (SessionState.get() != null) {
-        try {
-          SessionState.get().getHiveHistory().logPlanProgress(plan);
-        } catch (Exception e) {
-          // Log and ignore
-          LOG.warn("Could not log query plan progress", e);
+    stateLock.lock();
+    try {
+      if (plan != null) {
+        plan.setDone();
+        if (SessionState.get() != null) {
+          try {
+            SessionState.get().getHiveHistory().logPlanProgress(plan);
+          } catch (Exception e) {
+            // Log and ignore
+            LOG.warn("Could not log query plan progress", e);
+          }
         }
       }
+    } finally {
+      stateLock.unlock();
     }
   }
 
@@ -1996,9 +2172,10 @@ public class Driver implements CommandProcessor {
 
   @SuppressWarnings("unchecked")
   public boolean getResults(List res) throws IOException, 
CommandNeedRetryException {
-    if (destroyed) {
-      throw new IOException("FAILED: Operation cancelled");
+    if (driverState == DriverState.DESTROYED || driverState == 
DriverState.CLOSED) {
+      throw new IOException("FAILED: query has been cancelled, closed, or 
destroyed.");
     }
+
     if (isFetchingTable()) {
       /**
        * If resultset serialization to thrift object is enabled, and if the 
destination table is
@@ -2062,6 +2239,9 @@ public class Driver implements CommandProcessor {
   }
 
   public void resetFetch() throws IOException {
+    if (driverState == DriverState.DESTROYED || driverState == 
DriverState.CLOSED) {
+      throw new IOException("FAILED: driver has been cancelled, closed or 
destroyed.");
+    }
     if (isFetchingTable()) {
       try {
         fetchTask.clearFetch();
@@ -2084,21 +2264,39 @@ public class Driver implements CommandProcessor {
     this.tryCount = tryCount;
   }
 
-  public int close() {
+  // DriverContext could be released in the query and close processes at same
+  // time, which needs to be thread protected.
+  private void releaseDriverContext() {
+    stateLock.lock();
     try {
-      try {
-        releaseResources();
-      } catch (Exception e) {
-        LOG.info("Exception while releasing resources", e);
+      if (driverCxt != null) {
+        driverCxt.shutdown();
+        driverCxt = null;
       }
-      if (fetchTask != null) {
-        try {
-          fetchTask.clearFetch();
-        } catch (Exception e) {
-          LOG.debug(" Exception while clearing the Fetch task ", e);
+    } catch (Exception e) {
+      LOG.debug("Exception while shutting down the task runner", e);
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  private void releasePlan() {
+    try {
+      if (plan != null) {
+        fetchTask = plan.getFetchTask();
+        if (fetchTask != null) {
+          fetchTask.setDriverContext(null);
+          fetchTask.setQueryPlan(null);
         }
-        fetchTask = null;
       }
+      plan = null;
+    } catch (Exception e) {
+      LOG.debug("Exception while clearing the Fetch task", e);
+    }
+  }
+
+  private void releaseContext() {
+    try {
       if (ctx != null) {
         ctx.clear();
         if (ctx.getHiveLocks() != null) {
@@ -2107,27 +2305,99 @@ public class Driver implements CommandProcessor {
         }
         ctx = null;
       }
-      if (null != resStream) {
+    } catch (Exception e) {
+      LOG.debug("Exception while clearing the context ", e);
+    }
+  }
+
+  private void releaseResStream() {
+    try {
+      if (resStream != null) {
+        ((FSDataInputStream) resStream).close();
+        resStream = null;
+      }
+    } catch (Exception e) {
+      LOG.debug(" Exception while closing the resStream ", e);
+    }
+  }
+
+  private void releaseFetchTask() {
+    try {
+      if (fetchTask != null) {
+        fetchTask.clearFetch();
+        fetchTask = null;
+      }
+    } catch (Exception e) {
+      LOG.debug(" Exception while clearing the FetchTask ", e);
+    }
+  }
+  // Close and release resources within a running query process. Since it runs 
under
+  // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race 
condition
+  // with the releases probably running in the other closing thread.
+  private int closeInProcess(boolean destroyed) {
+    releaseDriverContext();
+    releasePlan();
+    releaseFetchTask();
+    releaseResStream();
+    releaseContext();
+    if (SessionState.get() != null) {
+      SessionState.get().getLineageState().clear();
+    }
+    if(destroyed) {
+      if (!hiveLocks.isEmpty()) {
         try {
-          ((FSDataInputStream) resStream).close();
-        } catch (Exception e) {
-          LOG.debug(" Exception while closing the resStream ", e);
+          releaseLocksAndCommitOrRollback(false, null);
+        } catch (LockException e) {
+          LOG.warn("Exception when releasing locking in destroy: " +
+              e.getMessage());
         }
       }
-    } catch (Exception e) {
-      console.printError("FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e) + "\n"
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
-      return 13;
+      ShutdownHookManager.removeShutdownHook(shutdownRunner);
     }
+    return 0;
+  }
 
+  // is called to stop the query if it is running, clean query results, and 
release resources.
+  public int close() {
+    stateLock.lock();
+    try {
+      releaseDriverContext();
+      if (driverState == DriverState.COMPILING ||
+          driverState == DriverState.EXECUTING ||
+          driverState == DriverState.INTERRUPT) {
+        driverState = DriverState.INTERRUPT;
+        return 0;
+      }
+      releasePlan();
+      releaseFetchTask();
+      releaseResStream();
+      releaseContext();
+      driverState = DriverState.CLOSED;
+    } finally {
+      stateLock.unlock();
+    }
+    if (SessionState.get() != null) {
+      SessionState.get().getLineageState().clear();
+    }
     return 0;
   }
 
+  // is usually called after close() to commit or rollback a query and end the 
driver life cycle.
+  // do not understand why it is needed and wonder if it could be combined 
with close.
   public void destroy() {
-    if (destroyed) {
-      return;
+    stateLock.lock();
+    try {
+      // in the cancel case where the driver state is INTERRUPTED, destroy 
will be deferred to
+      // the query process
+      if (driverState == DriverState.DESTROYED ||
+          driverState == DriverState.INTERRUPT) {
+        return;
+      } else {
+        driverState = DriverState.DESTROYED;
+      }
+    } finally {
+      stateLock.unlock();
     }
-    destroyed = true;
     if (!hiveLocks.isEmpty()) {
       try {
         releaseLocksAndCommitOrRollback(false, null);
@@ -2139,6 +2409,7 @@ public class Driver implements CommandProcessor {
     ShutdownHookManager.removeShutdownHook(shutdownRunner);
   }
 
+
   public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws 
IOException {
     return plan.getQueryPlan();
   }

Reply via email to