dtaniwaki opened a new pull request, #56497: URL: https://github.com/apache/spark/pull/56497
JIRA: https://issues.apache.org/jira/browse/SPARK-57425 ### What changes were proposed in this pull request? This PR makes `ExecutePlanResponseReattachableIterator` (`python/pyspark/sql/connect/client/reattach.py`) survive short-TTL credential expiry. Two changes: **1. Per-RPC metadata refresh.** `metadata` now accepts a zero-arg `Callable` in addition to an `Iterable`. When a callable is supplied, the iterator invokes it immediately before every gRPC call: - initial `ExecutePlan`, - retry `ExecutePlan` after `INVALID_HANDLE.OPERATION_NOT_FOUND`, - `ReattachExecute`, - `ReleaseExecute` (`release_until` and `release_all`). `SparkConnectClient` now passes the bound method `self._builder.metadata` rather than calling it once at construction, so a custom `ChannelBuilder` that refreshes its parameters takes effect across reattach. Static iterables are materialized to a list so single-shot iterators stay usable. Provider failures inside `_release_*` log a distinct message to avoid being mistaken for server-side release errors. **2. One `PERMISSION_DENIED` retry per iterator.** A first mid-stream `PERMISSION_DENIED` is treated as a candidate token-expiry signal: the iterator clears its underlying stream and raises `RetryException`, mirroring the existing `DEADLINE_EXCEEDED` branch. The retry attempt issues `ReattachExecute`, the metadata provider is re-evaluated, and a refreshed credential takes effect. A second `PERMISSION_DENIED` per iterator propagates as a genuine auth failure. ### Why are the changes needed? The reattach mechanism exists to recover from a broken gRPC stream before `ResultComplete`. That recovery is structurally impossible today when the server enforces a short auth-token TTL (e.g. AWS Athena Spark, 30 min) and a query outlives the TTL: - the default retry policy does not treat `PERMISSION_DENIED` as retryable, so the 403 propagates before reattach is attempted, and - even if reattach did fire, it would carry the expired token captured at `__init__`. Both gaps must change for the iterator's contract to hold for this class of scenarios. This has stayed invisible in typical deployments because four conditions must align: a short server TTL, a stream that outlives it, a server that actively kills the stream on expiry, and reattach actually firing (no `ResultComplete`). Local dev without auth, on-prem with long-lived tokens, and short ad-hoc queries each violate at least one; managed federated-credential environments hit all four. `ChannelBuilder.metadata()` is already rebuilt per call, so non-reattach RPCs (`AnalyzePlan`, `Config`, ...) already pick up refreshed credentials — only the reattach iterator was stuck. The dbt-athena Spark adapter ships three runtime monkey-patches (stash the builder on the stub, refresh metadata before `ReattachExecute`, allow one `PERMISSION_DENIED` retry per iterator) that have been running in production. This PR folds those moving parts into upstream so the workaround becomes unnecessary. **Backport requested to branch-4.0, branch-4.1, branch-4.2.** Athena's Spark Connect endpoint and most current managed deployments run on the 4.x line, and 5.x adoption is realistically far off. The touched code is structurally identical across 4.x and master, so the backport is near-mechanical. ### Does this PR introduce _any_ user-facing change? No. `ExecutePlanResponseReattachableIterator` is internal. Existing callers passing an iterable see no behavior change, and a `PERMISSION_DENIED` that does not stem from a recoverable mid-stream condition still propagates as before (the new retry budget is one per iterator). ### How was this patch tested? Two test classes in `python/pyspark/sql/tests/connect/client/test_client.py`, 10 new cases: - **`SparkConnectClientReattachTestCase`** — seven mock-stub cases using an extended `MockSparkConnectStub` that records the metadata of every RPC. Cover: per-RPC callable invocation, refresh on `INVALID_HANDLE.OPERATION_NOT_FOUND`, static-iterable caching, one-shot iterator materialization across a forced reattach, isolation of provider errors in `_release_*`, mid-stream `PERMISSION_DENIED` retry, and propagation after a second `PERMISSION_DENIED`. - **`SparkConnectClientReattachGrpcEndToEndTestCase`** — three cases that drive the iterator through a real in-process gRPC server (`grpc.server()` + a minimal `SparkConnectServiceServicer`) so the metadata callable, mid-stream `PERMISSION_DENIED` recovery, and the once-per-iterator retry budget are exercised at the wire level without a JVM or any external Spark process. Existing `SparkConnectClientReattachTestCase` and `SparkConnectClientRetriesTestCase` cases continue to pass. ``` python -m pytest python/pyspark/sql/tests/connect/client/test_client.py \ python/pyspark/sql/tests/connect/client/test_client_retries.py ``` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Anthropic), Opus 4.7 (1M context) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
