bogao007 commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293994248


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -111,17 +128,79 @@ object StreamingForeachBatchHelper extends Logging {
       logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 
$ret)")
     }
 
-    dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder)
+    (dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder), 
RunnerCleaner(runner))
   }
 
-  // TODO(SPARK-44433): Improve termination of Processes
-  //   The goal is that when a query is terminated, the python process 
associated with foreachBatch
-  //   should be terminated. One way to do that is by registering streaming 
query listener:
-  //   After pythonForeachBatchWrapper() is invoked by the SparkConnectPlanner.
-  //   At that time, we don't have the streaming queries yet.
-  //   Planner should call back into this helper with the query id when it 
starts it immediately
-  //   after. Save the query id to StreamingPythonRunner mapping. This mapping 
should be
-  //   part of the SessionHolder.
-  //   When a query is terminated, check the mapping and terminate any 
associated runner.
-  //   These runners should be terminated when a session is deleted (due to 
timeout, etc).
+  /**
+   * This manages cache from queries to cleaner for runners used for streaming 
queries. This is
+   * used in [[SessionHolder]].
+   */
+  class CleanerCache(sessionHolder: SessionHolder) {
+
+    private case class CacheKey(queryId: String, runId: String)
+
+    // Mapping from streaming (queryId, runId) to runner cleaner. Used for 
Python foreachBatch.
+    private val cleanerCache: ConcurrentMap[CacheKey, AutoCloseable] = new 
ConcurrentHashMap()
+
+    private lazy val streamingListener = { // Initialized on first registered 
query
+      val listener = new StreamingRunnerCleanerListener
+      sessionHolder.session.streams.addListener(listener)
+      logInfo(s"Registered runner clean up listener for session 
${sessionHolder.sessionId}")
+      listener
+    }
+
+    private[connect] def registerCleanerForQuery(
+        query: StreamingQuery,
+        cleaner: AutoCloseable): Unit = {
+
+      streamingListener // Access to initialize
+      val key = CacheKey(query.id.toString, query.runId.toString)
+
+      Option(cleanerCache.putIfAbsent(key, cleaner)) match {
+        case Some(_) =>
+          throw new IllegalStateException(s"Unexpected: a cleaner for query 
$key is already set")

Review Comment:
   Ah OK, then it should be fine, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to