This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new fb5495f64ff [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires fb5495f64ff is described below commit fb5495f64ffdedf3006a6f1a66cda128e164ad32 Author: Wei Liu <wei....@databricks.com> AuthorDate: Mon Sep 4 09:36:49 2023 +0900 [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires ### What changes were proposed in this pull request? Clean up all running python StreamingQueryLIstener processes when session expires ### Why are the changes needed? Improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test will be added in SPARK-44462. Currently there is no way to test this because the session will never expire. This is because the started python listener process (on the server) will establish a connection with the server process with the same session id and ping it all the time. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42687 from WweiL/SPARK-44433-followup-listener-cleanup. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 7a01ba65b7408bc3b907aa7b0b27279913caafe9) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/connect/planner/SparkConnectPlanner.scala | 4 +++- .../spark/sql/connect/service/SessionHolder.scala | 21 ++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 46c465e4deb..2abbacc5a9b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2902,7 +2902,9 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { SparkConnectService.streamingSessionManager.registerNewStreamingQuery(sessionHolder, query) // Register the runner with the query if Python foreachBatch is enabled. foreachBatchRunnerCleaner.foreach { cleaner => - sessionHolder.streamingRunnerCleanerCache.registerCleanerForQuery(query, cleaner) + sessionHolder.streamingForeachBatchRunnerCleanerCache.registerCleanerForQuery( + query, + cleaner) } executeHolder.eventsManager.postFinished() diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 2034a97fce9..1cef02d7e34 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -57,7 +57,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio new ConcurrentHashMap() // Handles Python process clean up for streaming queries. Initialized on first use in a query. - private[connect] lazy val streamingRunnerCleanerCache = + private[connect] lazy val streamingForeachBatchRunnerCleanerCache = new StreamingForeachBatchHelper.CleanerCache(this) /** Add ExecuteHolder to this session. Called only by SparkConnectExecutionManager. */ @@ -160,7 +160,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio eventManager.postClosed() // Clean up running queries SparkConnectService.streamingSessionManager.cleanupRunningQueries(this) - streamingRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers. + streamingForeachBatchRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers. + removeAllListeners() // removes all listener and stop python listener processes if necessary. } /** @@ -237,11 +238,21 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio * Spark Connect PythonStreamingQueryListener. */ private[connect] def removeCachedListener(id: String): Unit = { - listenerCache.get(id) match { - case pyListener: PythonStreamingQueryListener => pyListener.stopListenerProcess() + Option(listenerCache.remove(id)) match { + case Some(pyListener: PythonStreamingQueryListener) => pyListener.stopListenerProcess() case _ => // do nothing } - listenerCache.remove(id) + } + + /** + * Stop all streaming listener threads, and removes all python process if applicable. Only + * called when session is expired. + */ + private def removeAllListeners(): Unit = { + listenerCache.forEach((id, listener) => { + session.streams.removeListener(listener) + removeCachedListener(id) + }) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org