LantaoJin commented on a change in pull request #31119:
URL: https://github.com/apache/spark/pull/31119#discussion_r559304940
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
##########
@@ -74,7 +74,10 @@ case class BroadcastExchangeExec(
child: SparkPlan) extends BroadcastExchangeLike {
import BroadcastExchangeExec._
- override val runId: UUID = UUID.randomUUID
+ // Cancelling a SQL statement from Spark ThriftServer needs to cancel
+ // its related broadcast sub-jobs. So set the run id to job group id if
exists.
+ override val runId: UUID =
Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
Review comment:
Ok, I know. To be transparent to users, how about add a new thread local
property `SparkContext.SPARK_RESERVED_JOB_GROUP_ID` or
`SPARK_THRIFTSERVER_JOB_GROUP_ID` to separate it.
```
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f6e8a5694d..cc3efed713 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -760,9 +760,13 @@ class SparkContext(config: SparkConf) extends Logging {
* may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String,
- description: String, interruptOnCancel: Boolean = false): Unit = {
+ description: String, interruptOnCancel: Boolean = false, reserved:
Boolean = false): Unit = {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
- setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+ if (reserved) {
+ setLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID, groupId)
+ } else {
+ setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+ }
// Note: Specifying interruptOnCancel in setJobGroup (rather than
cancelJobGroup) avoids
// changing several public APIs and allows Spark cancellations outside
of the cancelJobGroup
// APIs to also take advantage of this property (e.g., internal job
failures or canceling from
@@ -2760,6 +2764,7 @@ object SparkContext extends Logging {
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
+ private[spark] val SPARK_RESERVED_JOB_GROUP_ID =
"spark.reservedJobGroup.id"
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL =
"spark.job.interruptOnCancel"
private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c322d5eef5..25abb4f2d3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -76,7 +76,8 @@ case class BroadcastExchangeExec(
// Cancelling a SQL statement from Spark ThriftServer needs to cancel
// its related broadcast sub-jobs. So set the run id to job group id if
exists.
- override val runId: UUID =
Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+ override val runId: UUID =
+
Option(sparkContext.getLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID))
.map(UUID.fromString).getOrElse(UUID.randomUUID)
override lazy val metrics = Map(
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8ca0ab91a7..4db50e8d00 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -286,7 +286,7 @@ private[hive] class SparkExecuteStatementOperation(
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
}
- sqlContext.sparkContext.setJobGroup(statementId,
substitutorStatement, forceCancel)
+ sqlContext.sparkContext.setJobGroup(statementId,
substitutorStatement, forceCancel, true)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
HiveThriftServer2.eventManager.onStatementParsed(statementId,
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]