LantaoJin commented on a change in pull request #31119:
URL: https://github.com/apache/spark/pull/31119#discussion_r559304940



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
##########
@@ -74,7 +74,10 @@ case class BroadcastExchangeExec(
     child: SparkPlan) extends BroadcastExchangeLike {
   import BroadcastExchangeExec._
 
-  override val runId: UUID = UUID.randomUUID
+  // Cancelling a SQL statement from Spark ThriftServer needs to cancel
+  // its related broadcast sub-jobs. So set the run id to job group id if 
exists.
+  override val runId: UUID = 
Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))

Review comment:
       Ok, I know. To be transparent to users, how about add a new thread local 
property `SparkContext.SPARK_RESERVED_JOB_GROUP_ID` or 
`SPARK_THRIFTSERVER_JOB_GROUP_ID` to separate it.
   ```
   diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
   index f6e8a5694d..cc3efed713 100644
   --- a/core/src/main/scala/org/apache/spark/SparkContext.scala
   +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
   @@ -760,9 +760,13 @@ class SparkContext(config: SparkConf) extends Logging {
       * may respond to Thread.interrupt() by marking nodes as dead.
       */
      def setJobGroup(groupId: String,
   -      description: String, interruptOnCancel: Boolean = false): Unit = {
   +      description: String, interruptOnCancel: Boolean = false, reserved: 
Boolean = false): Unit = {
        setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
   -    setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
   +    if (reserved) {
   +      setLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID, groupId)
   +    } else {
   +      setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
   +    }
        // Note: Specifying interruptOnCancel in setJobGroup (rather than 
cancelJobGroup) avoids
        // changing several public APIs and allows Spark cancellations outside 
of the cancelJobGroup
        // APIs to also take advantage of this property (e.g., internal job 
failures or canceling from
   @@ -2760,6 +2764,7 @@ object SparkContext extends Logging {
   
      private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
      private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
   +  private[spark] val SPARK_RESERVED_JOB_GROUP_ID = 
"spark.reservedJobGroup.id"
      private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = 
"spark.job.interruptOnCancel"
      private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
      private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope
   diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
   index c322d5eef5..25abb4f2d3 100644
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
   @@ -76,7 +76,8 @@ case class BroadcastExchangeExec(
   
      // Cancelling a SQL statement from Spark ThriftServer needs to cancel
      // its related broadcast sub-jobs. So set the run id to job group id if 
exists.
   -  override val runId: UUID = 
Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
   +  override val runId: UUID =
   +    
Option(sparkContext.getLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID))
          .map(UUID.fromString).getOrElse(UUID.randomUUID)
   
      override lazy val metrics = Map(
   diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
   index 8ca0ab91a7..4db50e8d00 100644
   --- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
   +++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
   @@ -286,7 +286,7 @@ private[hive] class SparkExecuteStatementOperation(
            
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
          }
   
   -      sqlContext.sparkContext.setJobGroup(statementId, 
substitutorStatement, forceCancel)
   +      sqlContext.sparkContext.setJobGroup(statementId, 
substitutorStatement, forceCancel, true)
          result = sqlContext.sql(statement)
          logDebug(result.queryExecution.toString())
          HiveThriftServer2.eventManager.onStatementParsed(statementId,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to