juliuszsompolski commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1329071826


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
     ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
     """
 
-    _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+    # Lock to manage the pool
+    _lock: ClassVar[RLockBase] = RLock()
+    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if 
os.cpu_count() else 8)
+
+    @classmethod
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+        """
+        When the channel is closed, this method will be called before, to make 
sure all
+        outstanding calls are closed.
+        """
+        with cls._lock:
+            if cls._release_thread_pool is not None:
+                cls._release_thread_pool.close()
+                cls._release_thread_pool.join()
+                cls._release_thread_pool = None

Review Comment:
   I've seen (and ignored for now...) the scala equivalent of this failing when 
we do `SparkConnectClient.shutdown`, which does `channel.shutdownNow()`. In 
scala, we don't have a dedicated threadpool for that, but (ab)use a grpc thread 
in  
https://github.com/apache/spark/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala#L179
   I wounder if more graceful shutdown of the channel would fixed it?



##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
     ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
     """
 
-    _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+    # Lock to manage the pool
+    _lock: ClassVar[RLockBase] = RLock()
+    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if 
os.cpu_count() else 8)
+
+    @classmethod
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+        """
+        When the channel is closed, this method will be called before, to make 
sure all
+        outstanding calls are closed.
+        """
+        with cls._lock:
+            if cls._release_thread_pool is not None:
+                cls._release_thread_pool.close()
+                cls._release_thread_pool.join()
+                cls._release_thread_pool = None

Review Comment:
   I've seen (and ignored for now...) the scala equivalent of this failing when 
we do `SparkConnectClient.shutdown`, which does `channel.shutdownNow()`. In 
scala, we don't have a dedicated threadpool for that, but (ab)use a grpc thread 
in  
https://github.com/apache/spark/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala#L179
   I wonder if more graceful shutdown of the channel would fixed it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to