juliuszsompolski commented on code in PR #42009:
URL: https://github.com/apache/spark/pull/42009#discussion_r1269125099
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########
@@ -96,5 +103,151 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected
exception: $e2")
finished = true
assert(ThreadUtils.awaitResult(interruptor, 10.seconds))
+ assert(interrupted.distinct.length == 2, s"Interrupted operations:
${interrupted.distinct}.")
+ }
+
+ test("interrupt tag") {
+ val session = spark
+ import session.implicits._
+
+ // global ExecutionContext has only 2 threads in Apache Spark CI
+ // create own thread pool for four Futures used in this test
+ val numThreads = 4
+ val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool",
numThreads)
+ val executionContext = ExecutionContext.fromExecutorService(fpool)
+
+ val q1 = Future {
+ assert(spark.getTags() == Set())
+ spark.addTag("two")
+ assert(spark.getTags() == Set("two"))
+ spark.clearTags() // check that clearing all tags works
+ assert(spark.getTags() == Set())
+ spark.addTag("one")
+ assert(spark.getTags() == Set("one"))
+ try {
+ spark
+ .range(10)
+ .map(n => {
+ Thread.sleep(30000); n
+ })
+ .collect()
+ } finally {
+ spark.clearTags() // clear for the case of thread reuse by another
Future
+ }
+ }(executionContext)
+ val q2 = Future {
+ assert(spark.getTags() == Set())
+ spark.addTag("one")
+ spark.addTag("two")
+ spark.addTag("one")
+ spark.addTag("two") // duplicates shouldn't matter
+ try {
+ spark
+ .range(10)
+ .map(n => {
+ Thread.sleep(30000); n
+ })
+ .collect()
+ } finally {
+ spark.clearTags() // clear for the case of thread reuse by another
Future
+ }
+ }(executionContext)
+ val q3 = Future {
+ assert(spark.getTags() == Set())
+ spark.addTag("foo")
+ spark.removeTag("foo")
+ assert(spark.getTags() == Set()) // check that remove works removing the
last tag
+ spark.addTag("two")
+ assert(spark.getTags() == Set("two"))
+ try {
+ spark
+ .range(10)
+ .map(n => {
+ Thread.sleep(30000); n
+ })
+ .collect()
+ } finally {
+ spark.clearTags() // clear for the case of thread reuse by another
Future
+ }
+ }(executionContext)
+ val q4 = Future {
+ assert(spark.getTags() == Set())
+ spark.addTag("one")
+ spark.addTag("two")
+ spark.addTag("two")
+ assert(spark.getTags() == Set("one", "two"))
+ spark.removeTag("two") // check that remove works, despite duplicate add
+ assert(spark.getTags() == Set("one"))
+ try {
+ spark
+ .range(10)
+ .map(n => {
+ Thread.sleep(30000); n
+ })
+ .collect()
+ } finally {
+ spark.clearTags() // clear for the case of thread reuse by another
Future
+ }
+ }(executionContext)
+ val interrupted = mutable.ListBuffer[String]()
+
+ // q2 and q3 should be cancelled
+ interrupted.clear()
+ eventually(timeout(20.seconds), interval(1.seconds)) {
+ val ids = spark.interruptTag("two")
+ interrupted ++= ids
+ assert(
+ interrupted.distinct.length == 2,
+ s"Interrupted operations: ${interrupted.distinct}.")
+ }
+ val e2 = intercept[SparkException] {
+ ThreadUtils.awaitResult(q2, 1.minute)
+ }
+ assert(e2.getCause.getMessage contains "OPERATION_CANCELED")
+ val e3 = intercept[SparkException] {
+ ThreadUtils.awaitResult(q3, 1.minute)
+ }
+ assert(e3.getCause.getMessage contains "OPERATION_CANCELED")
+ assert(interrupted.distinct.length == 2, s"Interrupted operations:
${interrupted.distinct}.")
+
+ // q1 and q4 should be cancelled
+ interrupted.clear()
+ eventually(timeout(20.seconds), interval(1.seconds)) {
+ val ids = spark.interruptTag("one")
+ interrupted ++= ids
+ assert(
+ interrupted.distinct.length == 2,
+ s"Interrupted operations: ${interrupted.distinct}.")
+ }
+ val e1 = intercept[SparkException] {
+ ThreadUtils.awaitResult(q1, 1.minute)
+ }
+ assert(e1.getCause.getMessage contains "OPERATION_CANCELED")
+ val e4 = intercept[SparkException] {
+ ThreadUtils.awaitResult(q4, 1.minute)
+ }
+ assert(e4.getCause.getMessage contains "OPERATION_CANCELED")
+ assert(interrupted.distinct.length == 2, s"Interrupted operations:
${interrupted.distinct}.")
+ }
+
+ test("interrupt operation") {
+ val session = spark
+ import session.implicits._
+
+ val result = spark
+ .range(10)
+ .map(n => {
+ Thread.sleep(5000); n
+ })
+ .collectResult()
+ // cancel
+ val operationId = result.operationId
Review Comment:
Currently getting the operationId is indeed a bit awkward, and adding tags
interruptTag is a more convenient API to use.
This will still be improved in a followup PR for async and detachable API
execution. Some preparation is here already -
https://github.com/apache/spark/pull/42009/files#diff-3cad257dc0c15b4d091beebdfd42659f803193c23667425d8926b84113a2a312R288
operationId can also be passed in ExecutePlanRequest, so that the client
already knows it from the start and doesn't need to take it from first response.
In my followup PR I also plan to add a response that would always be sent
right at the beginning of the query, so that even if the client did not set the
operationId, it can get it right away.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]