sarutak opened a new pull request, #52704:
URL: https://github.com/apache/spark/pull/52704
### What changes were proposed in this pull request?
This PR aims to reenable
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession
are isolated`.
#48736 disabled this test because of it was flaky. In this test, futures ran
on threads managed by `ForkJoinPool`.
Each future invokes `SparkSession#addTag` and `SparkSession#getTag`, and
tags are implemented using `InheritableThreadLocal`. So the root cause of this
issue is same as #52417.
But #48906 replaced `ForkJoinPool` with `Executors.newFixedThreadPool(3)` so
I believe this issue no longer occurs.
In fact, this issue can be reproduced by replacing
`Executors.newFixedThreadPool(3)` with `new ForkJoinPool(3)` and inserting a
sleep like as follows.
```
// global ExecutionContext has only 2 threads in Apache Spark CI
// create own thread pool for four Futures used in this test
- val threadPool = Executors.newFixedThreadPool(3)
+ val threadPool = new ForkJoinPool(3)
...
+ Thread.sleep(1000)
val jobB = Future {
sessionB = globalSession.cloneSession()
import globalSession.implicits._
```
Then, run the test as follows.
```
$ build/sbt 'sql/testOnly
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite -- -z
"Cancellation APIs in Spark\
Session are isolated"'
```
```
info] - Cancellation APIs in SparkSession are isolated *** FAILED *** (2
seconds, 726 milliseconds)
[info] ArrayBuffer({"spark.app.startTime"="1761192376305",
"spark.rdd.scope"="{"id":"3","name":"Exchange"}",
"spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K",
"spark.hadoop.hadoop.caller.context.enabled"="true",
"spark.memory.debugFill"="true", "spark.master.rest.enabled"="false",
"spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse",
"spark.master"="local[2]", "spark.job.interruptOnCancel"="true",
"spark.app.name"="test", "spark.driver.host"="192.168.1.109",
"spark.app.id"="local-1761192376735",
"spark.job.tags"="spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c,spark-session-8c09c25f-089c-41ee-add1-1de463658349-thread-6b832f9d-3a55-4d1f-b47d-418fc2ed05e4-one,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-execution-root-id-0,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-thread-a4b5b347-6e56-4416-b3a5-37a312bdfe34-one,spark-session-8c09c25f-089c-41ee-add1-1de463658349,spark-session-8c09c25f-089c-41ee-add1-1de463658349-thread-6b832f9
d-3a55-4d1f-b47d-418fc2ed05e4-two",
"spark.unsafe.exceptionOnMemoryLeak"="true", "spark.sql.execution.root.id"="0",
"spark.ui.showConsoleProgress"="false",
"spark.driver.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false
-XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.sec
urity.jgss/sun.security.krb5=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled
-Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE
--enable-native-access=ALL-UNNAMED", "spark.driver.port"="56972",
"spark.testing"="true",
"spark.hadoop.fs.s3a.vectored.read.max.merged.size"="2M",
"spark.sql.execution.id"="1", "spark.rdd.scope.noOverride"="true",
"spark.executor.id"="driver", "spark.port.maxRetries"="100",
"spark.executor.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false
-XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concur
rent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled
-Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE
--enable-native-access=ALL-UNNAMED",
"spark.test.home"="/Users/sarutak/oss/spark", "spark.ui.enabled"="false"},
{"spark.app.startTime"="1761192376305",
"spark.rdd.scope"="{"id":"5","name":"Exchange"}",
"spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K",
"spark.hadoop.hadoop.caller.context.enabled"="true",
"spark.memory.debugFill"="true", "spark.master.rest.enabled"="false",
"spark.sql.warehouse.dir"="file:/Users/sarut
ak/oss/spark/sql/core/spark-warehouse", "spark.master"="local[2]",
"spark.job.interruptOnCancel"="true", "spark.app.name"="test",
"spark.driver.host"="192.168.1.109", "spark.app.id"="local-1761192376735",
"spark.job.tags"="spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-execution-root-id-0,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-thread-a4b5b347-6e56-4416-b3a5-37a312bdfe34-one,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c",
"spark.unsafe.exceptionOnMemoryLeak"="true",
"spark.sql.execution.root.id"="0", "spark.ui.showConsoleProgress"="false",
"spark.driver.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false
-XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.b
ase/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled
-Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE
--enable-native-access=ALL-UNNAMED", "spark.driver.port"="56972",
"spark.testing"="true",
"spark.hadoop.fs.s3a.vectored.read.max.merged.size"="2M",
"spark.sql.execution.id"="0", "spark.rdd.scope.noOverride"="true",
"spark.executor.id"="driver", "spark.port.maxRetries"="100",
"spark.executor.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false
-XX:+IgnoreUnre
cognizedVMOptions --add-modules=jdk.incubator.vector
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled
-Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE
--enable-native-a
ccess=ALL-UNNAMED", "spark.test.home"="/Users/sarutak/oss/spark",
"spark.ui.enabled"="false"}) had size 2 instead of expected size 1
(SparkSessionJobTaggingAndCancellationSuite.scala:229)
[info] org.scalatest.exceptions.TestFailedException:
[info] at
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$13(SparkSessionJobTaggingAndCancellationSuite.scala:229)
[info] at scala.collection.immutable.List.foreach(List.scala:323)
[info] at
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$6(SparkSessionJobTaggingAndCancellationSuite.scala:226)
[info] at
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] at
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info] at
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info] at
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
[info] at
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
[info] at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info] at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:323)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
```
On the other hand, if inserting sleep but leaving
`Executors.newFixedThreadPool(3)` as it is, this test always seems to pass.
So, we can now reenable this test.
### Why are the changes needed?
For better test coverage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The test always passes on my dev environment even if inserting sleep like
explained above.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]