This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2aa172dba11 [SPARK-45822][CONNECT] SparkConnectSessionManager may look up a stopped sparkcontext 2aa172dba11 is described below commit 2aa172dba1176de76719021a45a017759379abe5 Author: Kent Yao <y...@apache.org> AuthorDate: Tue Nov 7 23:32:47 2023 +0800 [SPARK-45822][CONNECT] SparkConnectSessionManager may look up a stopped sparkcontext ### What changes were proposed in this pull request? This PR checks whether the sc is still functional before cloning a new isolated session from it. ### Why are the changes needed? SparkSession.active is a thread-local value and not be updated by other thread. This causes https://github.com/LuciferYang/spark/actions/runs/6767960232/job/18426049162 ```java - ReleaseSession: session with different session_id or user_id allowed after release *** FAILED *** (9 milliseconds) [info] org.apache.spark.SparkException: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. [info] This stopped SparkContext was created at: [info] [info] org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite.beforeAll(SparkConnectSessionHolderSuite.scala:37) [info] org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212) ``` For shared spark sessions in tests, these sessions are created, stopped, and retrieved in different threads. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? I ran `build/sbt "connect/testOnly *SparkConnect*"` locally and the test consistently failed w/o this patch. Otherwise, it passed. ### Was this patch authored or co-authored using generative AI tooling? no Closes #43701 from yaooqinn/SPARK-45822. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../spark/sql/connect/service/SparkConnectSessionManager.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index 5c8e3c61158..ba402a90a71 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -139,7 +139,13 @@ class SparkConnectSessionManager extends Logging { } private def newIsolatedSession(): SparkSession = { - SparkSession.active.newSession() + val active = SparkSession.active + if (active.sparkContext.isStopped) { + assert(SparkSession.getDefaultSession.nonEmpty) + SparkSession.getDefaultSession.get.newSession() + } else { + active.newSession() + } } private def validateSessionCreate(key: SessionKey): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org