zhengruifeng opened a new pull request, #53749:
URL: https://github.com/apache/spark/pull/53749

   ### What changes were proposed in this pull request?
   Timeout command `_delete_ml_cache`
   
   
   ### Why are the changes needed?
   the `test_parity_clustering` still hangs, even though the probabality is 
lower than before
   
   The all threads dumps 
https://github.com/gaogaotiantian/spark/actions/runs/20841838810/job/59877763802
   
   shows it get stuck at `_delete_ml_cache`
   
   
   looks like there is still a dead lock in grpc on python side
   
   ```
   Traceback for thread 24397 (python3.11) [] (most recent call last):
       (Python) File "/usr/lib/python3.11/threading.py", line 1002, in 
_bootstrap
           self._bootstrap_inner()
       (Python) File "/usr/lib/python3.11/threading.py", line 1045, in 
_bootstrap_inner
           self.run()
       (Python) File "/usr/lib/python3.11/threading.py", line 982, in run
           self._target(*self._args, **self._kwargs)
       (Python) File "/usr/lib/python3.11/concurrent/futures/thread.py", line 
83, in _worker
           work_item.run()
       (Python) File "/usr/lib/python3.11/concurrent/futures/thread.py", line 
58, in run
           result = self.fn(*self.args, **self.kwargs)
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238, in 
target
           self._stub.ReleaseExecute(request, metadata=self._metadata)
       (Python) File 
"/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1163, in 
__call__
           state, call = self._blocking(
       (Python) File 
"/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1134, in 
_blocking
           call = self._channel.segregated_call(
       (Python) File "/usr/lib/python3.11/threading.py", line 272, in __enter__
           return self._lock.__enter__()
   
   
   Traceback for thread 24268 (python3.11) [] (most recent call last):
       (Python) File "<frozen runpy>", line 198, in _run_module_as_main
       (Python) File "<frozen runpy>", line 88, in _run_code
       ...
       (Python) File 
"/__w/spark/spark/python/pyspark/ml/tests/test_clustering.py", line 466, in 
test_distributed_lda
           self.assertEqual(str(model), str(model2))
       (Python) File "/__w/spark/spark/python/pyspark/ml/wrapper.py", line 474, 
in __repr__
           return self._call_java("toString")
       (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 322, in 
wrapped
           return remote_call()
       (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 308, in 
remote_call
           (_, properties, _) = session.client.execute_command(command)
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1162, in 
execute_command
           data, _, metrics, observed_metrics, properties = 
self._execute_and_fetch(
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1664, in 
_execute_and_fetch
           for response in self._execute_and_fetch_as_iterator(
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1621, in 
_execute_and_fetch_as_iterator
           generator = ExecutePlanResponseReattachableIterator(
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in 
__init__
           self._stub.ExecutePlan(self._initial_request, metadata=metadata)
       (Python) File 
"/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1396, in 
__call__
           call = self._managed_call(
       (Python) File 
"/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1785, in create
           call = state.channel.integrated_call(
       (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 379, in 
wrapped
           self._remote_model_obj.release_ref()
       (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 162, in 
release_ref
           del_remote_cache(self.ref_id)
       (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 358, in 
del_remote_cache
           session.client._delete_ml_cache([ref_id])
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 2137, in 
_delete_ml_cache
           (_, properties, _) = self.execute_command(command)
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1162, in 
execute_command
           data, _, metrics, observed_metrics, properties = 
self._execute_and_fetch(
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1664, in 
_execute_and_fetch
           for response in self._execute_and_fetch_as_iterator(
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1621, in 
_execute_and_fetch_as_iterator
           generator = ExecutePlanResponseReattachableIterator(
       (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in 
__init__
           self._stub.ExecutePlan(self._initial_request, metadata=metadata)
       (Python) File 
"/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1396, in 
__call__
           call = self._managed_call(
       (Python) File 
"/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1784, in create
           with state.lock:
   ```
   
   The `ReleaseExecute` in thread 24397 is for retry/reattach
   
   
https://github.com/apache/spark/blob/d2cc107560e71c4116a115ea85f6ca18baab8849/python/pyspark/sql/connect/client/reattach.py#L39-L57
   
   It is not clear how it affect the `_delete_ml_cache`, but command 
`_delete_ml_cache` itself doesn't need retry, so I rewrite it without retry and 
set a timeout=3 sec.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Will monitor the CI
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to