hvanhovell commented on code in PR #47815:
URL: https://github.com/apache/spark/pull/47815#discussion_r1745473650


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -649,6 +671,77 @@ class SparkSession private(
     artifactManager.addLocalArtifacts(uri.flatMap(Artifact.parseArtifacts))
   }
 
+  /** @inheritdoc */
+  override def addTag(tag: String): Unit = {
+    SparkContext.throwIfInvalidTag(tag)
+    managedJobTags.put(tag, s"spark-session-$sessionUUID-$tag")
+  }
+
+  /** @inheritdoc */
+  override def removeTag(tag: String): Unit = {
+    SparkContext.throwIfInvalidTag(tag)
+    managedJobTags.remove(tag)
+  }
+
+  /** @inheritdoc */
+  override def getTags(): Set[String] = managedJobTags.keys().asScala.toSet
+
+  /** @inheritdoc */
+  override def clearTags(): Unit = managedJobTags.clear()
+
+  /**
+   * Request to interrupt all currently running operations of this session.
+   *
+   * @note This method will wait up to 60 seconds for the interruption request 
to be issued.
+
+   * @return Sequence of SQL execution IDs requested to be interrupted.
+
+   * @since 4.0.0
+   */
+  override def interruptAll(): Seq[String] =
+    doInterruptTag(sessionJobTag, "as part of cancellation of all jobs")
+
+  /**
+   * Request to interrupt all currently running operations of this session 
with the given job tag.
+   *
+   * @note This method will wait up to 60 seconds for the interruption request 
to be issued.
+   *
+   * @return Sequence of SQL execution IDs requested to be interrupted.
+   */
+  override def interruptTag(tag: String): Seq[String] = {
+    val realTag = managedJobTags.get(tag)
+    if (realTag == null) return Seq.empty
+    doInterruptTag(realTag, s"part of cancelled job tags $tag")
+  }
+
+  private def doInterruptTag(tag: String, reason: String): Seq[String] = {
+    val cancelledTags =
+      sparkContext.cancelJobsWithTagWithFuture(tag, reason)
+
+    ThreadUtils.awaitResult(cancelledTags, 60.seconds)
+      .flatMap(job => 
Option(job.properties.getProperty(SQLExecution.EXECUTION_ROOT_ID_KEY)))
+  }
+
+  /**
+   * Request to interrupt an operation of this session, given its SQL 
execution ID.
+   *
+   * @note This method will wait up to 60 seconds for the interruption request 
to be issued.
+   *
+   * @return The execution ID requested to be interrupted, as a single-element 
sequence, or an empty
+   *    sequence if the operation is not started by this session.
+   *
+   * @since 4.0.0
+   */
+  override def interruptOperation(operationId: String): Seq[String] = {
+    scala.util.Try(operationId.toLong).toOption match {
+      case Some(executionIdToBeCancelled) =>
+        val tagToBeCancelled = SQLExecution.executionIdJobTag(this, 
executionIdToBeCancelled)
+        doInterruptTag(tagToBeCancelled, reason = "")

Review Comment:
   Perhaps set a reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to