gatorsmile commented on a change in pull request #29204:
URL: https://github.com/apache/spark/pull/29204#discussion_r475722597
##########
File path:
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
##########
@@ -94,15 +96,32 @@ private[hive] trait SparkOperation extends Operation with
Logging {
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}
- protected def onError(): PartialFunction[Throwable, Unit] = {
+ protected def onError(needCancel: Boolean = false):
PartialFunction[Throwable, Unit] = {
+ // Actually do need to catch Throwable as some failures don't inherit from
Exception and
+ // HiveServer will silently swallow them.
case e: Throwable =>
- logError(s"Error operating $getType with $statementId", e)
- super.setState(OperationState.ERROR)
- HiveThriftServer2.eventManager.onStatementError(
- statementId, e.getMessage, Utils.exceptionString(e))
- e match {
- case _: HiveSQLException => throw e
- case _ => throw new HiveSQLException(s"Error operating $getType
${e.getMessage}", e)
+ // When cancel() or close() is called very quickly after the query is
started,
+ // then they may both call cleanup() before Spark Jobs are started. But
before background
+ // task interrupted, it may have start some spark job, so we need to
cancel again to
+ // make sure job was cancelled when background thread was interrupted
+ if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
+ val currentState = getStatus.getState
+ if (currentState.isTerminal) {
+ // This may happen if the execution was cancelled, and then closed
from another thread.
+ logWarning(s"Ignore exception in terminal state with $statementId: $e")
+ } else {
+ super.setState(OperationState.ERROR)
+ HiveThriftServer2.eventManager.onStatementError(
+ statementId, e.getMessage, Utils.exceptionString(e))
+ e match {
+ case _: HiveSQLException => throw e
+ case rejected: RejectedExecutionException =>
+ throw new HiveSQLException("The background threadpool cannot
accept" +
+ " new task for execution, please retry the operation", rejected)
+ case _ =>
+ val tips = if (shouldRunAsync()) " in background" else ""
+ throw new HiveSQLException(s"Error operating $getType$tips:
${e.getMessage}", e)
Review comment:
+1
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]