LantaoJin commented on pull request #31119:
URL: https://github.com/apache/spark/pull/31119#issuecomment-761926681
```
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 50cc47d0f8..3d2827ecb7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -977,6 +977,15 @@ object SQLConf {
.timeConf(TimeUnit.SECONDS)
.createWithDefault(0L)
+ val THRIFTSERVER_BROADCAST_CANCEL =
+ buildConf("spark.sql.thriftServer.broadcastCancel")
+ .internal()
+ .doc("When true, cancel the related broadcast sub-jobs when SQL
statement is cancelled. " +
+ "This configuration is only used internally and don't set it
manually.")
+ .version("3.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
val THRIFTSERVER_UI_STATEMENT_LIMIT =
buildConf("spark.sql.thriftserver.ui.retainedStatements")
.doc("The number of SQL statements kept in the JDBC/ODBC web UI
history.")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c322d5eef5..39ea035d62 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -76,8 +76,12 @@ case class BroadcastExchangeExec(
// Cancelling a SQL statement from Spark ThriftServer needs to cancel
// its related broadcast sub-jobs. So set the run id to job group id if
exists.
- override val runId: UUID =
Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
- .map(UUID.fromString).getOrElse(UUID.randomUUID)
+ override val runId: UUID =
+ if (SQLConf.get.getConf(SQLConf.THRIFTSERVER_BROADCAST_CANCEL)) {
+
UUID.fromString(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+ } else {
+ UUID.randomUUID
+ }
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8ca0ab91a7..68a02842ef 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -285,7 +285,7 @@ private[hive] class SparkExecuteStatementOperation(
if (!runInBackground) {
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
}
-
+ sqlContext.conf.setConf(SQLConf.THRIFTSERVER_BROADCAST_CANCEL, true)
sqlContext.sparkContext.setJobGroup(statementId,
substitutorStatement, forceCancel)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]