juliuszsompolski commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1329071826
##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
ReleaseExecute RPCs that instruct the server to release responses that it
already processed.
"""
- _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+ # Lock to manage the pool
+ _lock: ClassVar[RLockBase] = RLock()
+ _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if
os.cpu_count() else 8)
+
+ @classmethod
+ def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+ """
+ When the channel is closed, this method will be called before, to make
sure all
+ outstanding calls are closed.
+ """
+ with cls._lock:
+ if cls._release_thread_pool is not None:
+ cls._release_thread_pool.close()
+ cls._release_thread_pool.join()
+ cls._release_thread_pool = None
Review Comment:
I've seen (and ignored for now...) the scala equivalent of this failing when
we do `SparkConnectClient.shutdown`, which does `channel.shutdownNow()`. In
scala, we don't have a dedicated threadpool for that, but (ab)use a grpc thread
in
https://github.com/databricks/runtime/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala#L179
I wounder if more graceful shutdown of the channel would fixed it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]