zhengruifeng opened a new pull request, #53749: URL: https://github.com/apache/spark/pull/53749
### What changes were proposed in this pull request? Timeout command `_delete_ml_cache` ### Why are the changes needed? the `test_parity_clustering` still hangs, even though the probabality is lower than before The all threads dumps https://github.com/gaogaotiantian/spark/actions/runs/20841838810/job/59877763802 shows it get stuck at `_delete_ml_cache` looks like there is still a dead lock in grpc on python side ``` Traceback for thread 24397 (python3.11) [] (most recent call last): (Python) File "/usr/lib/python3.11/threading.py", line 1002, in _bootstrap self._bootstrap_inner() (Python) File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner self.run() (Python) File "/usr/lib/python3.11/threading.py", line 982, in run self._target(*self._args, **self._kwargs) (Python) File "/usr/lib/python3.11/concurrent/futures/thread.py", line 83, in _worker work_item.run() (Python) File "/usr/lib/python3.11/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238, in target self._stub.ReleaseExecute(request, metadata=self._metadata) (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1163, in __call__ state, call = self._blocking( (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1134, in _blocking call = self._channel.segregated_call( (Python) File "/usr/lib/python3.11/threading.py", line 272, in __enter__ return self._lock.__enter__() Traceback for thread 24268 (python3.11) [] (most recent call last): (Python) File "<frozen runpy>", line 198, in _run_module_as_main (Python) File "<frozen runpy>", line 88, in _run_code ... (Python) File "/__w/spark/spark/python/pyspark/ml/tests/test_clustering.py", line 466, in test_distributed_lda self.assertEqual(str(model), str(model2)) (Python) File "/__w/spark/spark/python/pyspark/ml/wrapper.py", line 474, in __repr__ return self._call_java("toString") (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 322, in wrapped return remote_call() (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 308, in remote_call (_, properties, _) = session.client.execute_command(command) (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1162, in execute_command data, _, metrics, observed_metrics, properties = self._execute_and_fetch( (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1664, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator( (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1621, in _execute_and_fetch_as_iterator generator = ExecutePlanResponseReattachableIterator( (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in __init__ self._stub.ExecutePlan(self._initial_request, metadata=metadata) (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1396, in __call__ call = self._managed_call( (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1785, in create call = state.channel.integrated_call( (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 379, in wrapped self._remote_model_obj.release_ref() (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 162, in release_ref del_remote_cache(self.ref_id) (Python) File "/__w/spark/spark/python/pyspark/ml/util.py", line 358, in del_remote_cache session.client._delete_ml_cache([ref_id]) (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 2137, in _delete_ml_cache (_, properties, _) = self.execute_command(command) (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1162, in execute_command data, _, metrics, observed_metrics, properties = self._execute_and_fetch( (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1664, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator( (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1621, in _execute_and_fetch_as_iterator generator = ExecutePlanResponseReattachableIterator( (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in __init__ self._stub.ExecutePlan(self._initial_request, metadata=metadata) (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1396, in __call__ call = self._managed_call( (Python) File "/usr/local/lib/python3.11/dist-packages/grpc/_channel.py", line 1784, in create with state.lock: ``` The `ReleaseExecute` in thread 24397 is for retry/reattach https://github.com/apache/spark/blob/d2cc107560e71c4116a115ea85f6ca18baab8849/python/pyspark/sql/connect/client/reattach.py#L39-L57 It is not clear how it affect the `_delete_ml_cache`, but command `_delete_ml_cache` itself doesn't need retry, so I rewrite it without retry and set a timeout=3 sec. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Will monitor the CI ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
