bogao007 commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293994248
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -111,17 +128,79 @@ object StreamingForeachBatchHelper extends Logging {
logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret:
$ret)")
}
- dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder)
+ (dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder),
RunnerCleaner(runner))
}
- // TODO(SPARK-44433): Improve termination of Processes
- // The goal is that when a query is terminated, the python process
associated with foreachBatch
- // should be terminated. One way to do that is by registering streaming
query listener:
- // After pythonForeachBatchWrapper() is invoked by the SparkConnectPlanner.
- // At that time, we don't have the streaming queries yet.
- // Planner should call back into this helper with the query id when it
starts it immediately
- // after. Save the query id to StreamingPythonRunner mapping. This mapping
should be
- // part of the SessionHolder.
- // When a query is terminated, check the mapping and terminate any
associated runner.
- // These runners should be terminated when a session is deleted (due to
timeout, etc).
+ /**
+ * This manages cache from queries to cleaner for runners used for streaming
queries. This is
+ * used in [[SessionHolder]].
+ */
+ class CleanerCache(sessionHolder: SessionHolder) {
+
+ private case class CacheKey(queryId: String, runId: String)
+
+ // Mapping from streaming (queryId, runId) to runner cleaner. Used for
Python foreachBatch.
+ private val cleanerCache: ConcurrentMap[CacheKey, AutoCloseable] = new
ConcurrentHashMap()
+
+ private lazy val streamingListener = { // Initialized on first registered
query
+ val listener = new StreamingRunnerCleanerListener
+ sessionHolder.session.streams.addListener(listener)
+ logInfo(s"Registered runner clean up listener for session
${sessionHolder.sessionId}")
+ listener
+ }
+
+ private[connect] def registerCleanerForQuery(
+ query: StreamingQuery,
+ cleaner: AutoCloseable): Unit = {
+
+ streamingListener // Access to initialize
+ val key = CacheKey(query.id.toString, query.runId.toString)
+
+ Option(cleanerCache.putIfAbsent(key, cleaner)) match {
+ case Some(_) =>
+ throw new IllegalStateException(s"Unexpected: a cleaner for query
$key is already set")
Review Comment:
Ah OK, then it should be fine, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]