juliuszsompolski commented on a change in pull request #25611:
[SPARK-28901][SQL] SparkThriftServer's Cancel SQL Operation show it in JDBC Tab
UI
URL: https://github.com/apache/spark/pull/25611#discussion_r322305264
##########
File path:
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
##########
@@ -201,33 +208,38 @@ private[hive] class SparkExecuteStatementOperation(
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
+ logError("Error submitting query in background, query rejected",
rejected)
setState(OperationState.ERROR)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, rejected.getMessage,
SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept"
+
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
+ HiveThriftServer2.listener.onStatementError(
+ statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
}
}
private def execute(): Unit = withSchedulerPool {
- statementId = UUID.randomUUID().toString
- logInfo(s"Running query '$statement' with $statementId")
- setState(OperationState.RUNNING)
- // Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
- Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
-
- HiveThriftServer2.listener.onStatementStart(
- statementId,
- parentSession.getSessionHandle.getSessionId.toString,
- statement,
- statementId,
- parentSession.getUsername)
- sqlContext.sparkContext.setJobGroup(statementId, statement)
try {
+ synchronized {
+ if (getStatus.getState.isTerminal) {
+ logInfo(s"Query with $statementId in terminal state before it
started running")
+ return
+ } else {
+ logInfo(s"Running query with $statementId")
+ setState(OperationState.RUNNING)
+ }
+ }
+ // Always use the latest class loader provided by executionHive's state.
+ val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
+ Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+
+ sqlContext.sparkContext.setJobGroup(statementId, statement)
Review comment:
Let's have
1. Thread1 be in `execute()` after this synchronized block, but before the
Jobs have started
2. Thread2 be a user connection calling `cancel()`. It cancels all jobs in
the job group, and notifies Thread1
3. But before Thread1 gets the notification and throws InterruptedException,
it starts some Jobs.
4. Then Thread1 gets InterruptedException, and exits through the catch and
finally block. It does a clearJobGroup in the finally, but that doesn't cancel
the Jobs started in 3. These Jobs keep running after Thread1 exits, and nobody
cancels them.
I think that the only way to prevent this, is to call another cancelJobGroup
from the catch block when an exception comes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]