Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/6207#discussion_r31146263
--- Diff:
sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
---
@@ -160,9 +170,69 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- def run(): Unit = {
- val statementId = UUID.randomUUID().toString
- logInfo(s"Running query '$statement'")
+ override def run(): Unit = {
+ setState(OperationState.PENDING)
+ setHasResultSet(true) // avoid no resultset for async run
+
+ if (!runInBackground) {
+ runInternal()
+ } else {
+ val parentSessionState = SessionState.get()
+ val hiveConf = getConfigForOperation()
+ val sparkServiceUGI =
ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ val sessionHive = getCurrentHive()
+ val currentSqlSession = hiveContext.currentSession
+
+ // Runnable impl to call runInternal asynchronously,
+ // from a different thread
+ val backgroundOperation = new Runnable() {
+
+ override def run(): Unit = {
+ val doAsAction = new PrivilegedExceptionAction[Object]() {
+ override def run(): Object = {
+
+ // User information is part of the metastore client member
in Hive
+ hiveContext.setSession(currentSqlSession)
+ Hive.set(sessionHive)
+ SessionState.setCurrentSessionState(parentSessionState)
+ try {
+ runInternal()
+ } catch {
+ case e: HiveSQLException =>
+ setOperationException(e)
+ log.error("Error running hive query: ", e)
+ }
+ return null
+ }
+ }
+
+ try {
+ ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
+ } catch {
+ case e: Exception =>
+ setOperationException(new HiveSQLException(e))
+ logError("Error running hive query as user : " +
+ sparkServiceUGI.getShortUserName(), e)
+ }
+ }
+ }
+ try {
+ // This submit blocks if no background threads are available to
run this operation
+ val backgroundHandle =
+
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR);
+ throw new HiveSQLException("The background threadpool cannot
accept" +
+ " new task for execution, please retry the operation",
rejected)
+ }
--- End diff --
I know that this part of code is mostly translated from vanilla Hive code,
but should we also catch more general exceptions here and call `setState`
accordingly?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]