haiyangsun-db opened a new pull request, #56397: URL: https://github.com/apache/spark/pull/56397
### What changes were proposed in this pull request? This PR refactors the experimental UDF worker **session interface** in the `udf-worker-core` module into an explicit state machine with well-defined terminal outcomes, so that a concrete transport backend can plug in cleanly. (The first such backend -- a gRPC-over-UDS dispatcher -- lands in a follow-up PR.) Core interface changes: - **`WorkerSession`** becomes a single `AtomicReference`-driven state machine (`Created -> Initializing -> Initialized -> Streaming -> Finishing`, with `Cancelling`/`Finished`/`Cancelled`/`Failed`/`TransportFailed` terminals). `init` now returns an `InitResponse`, `process` takes a `finish` thunk, and `close` returns a `Termination` instead of throwing; the previous separate `cancel()` is folded into `close()`. Subclasses implement `doInit` / `doProcess` / `doClose` hooks and drive transitions via protected helpers. - **`Termination`** (new) is a sealed trait modeling the four terminal outcomes: `Finished(FinishResponse)`, `Cancelled(CancelResponse)`, `Failed(ExecutionError)`, and `TransportFailed(Throwable)`. - **`WorkerHandle`** (new) is a small trait that decouples a session from the concrete worker-provisioning model. `DirectWorkerProcess` implements it and gains a cleanup-hook registry, so a dispatcher can register transport-specific cleanup (e.g. deleting a socket file) without the core hard-coding it. - **`DirectWorkerDispatcher`** is generalized from a Unix-socket-specific class into abstract transport hooks (`newEndpointAddress` / `waitForReady` / `cleanupEndpointAddress` / `closeTransport` / `validateTransportSupport` / `newConnection` / `newSession` / `initialize`). The concrete Unix-socket dispatcher and session (`DirectUnixSocketWorkerDispatcher`, `DirectWorkerSession`) are removed -- the first concrete backend is provided by the follow-up gRPC change. - **`WorkerConnection`** becomes a trait. Caller/test changes: - `ExternalUDFExec` (sql/core) is updated to the new `close()` / `Termination` finalizer: a single task-completion listener now handles both the success path (a `FinishResponse` has already arrived) and the failure/early-stop path (send `Cancel`, await `CancelResponse`), replacing the previous separate cancel-on-failure listener. - The in-core test scaffolding (`TestDirectWorkerHelpers`, `DirectWorkerDispatcherSuite`) is adapted to the new API as a small UDS-backed test dispatcher, so `udf-worker-core` and `sql/core` stay green. This scaffolding moves to the `udf-worker-grpc` module alongside the concrete gRPC dispatcher in the follow-up PR. Class hierarchy after this change: ``` WorkerSession (abstract state machine; close(): Termination) WorkerHandle (trait) <- DirectWorkerProcess DirectWorkerDispatcher (abstract transport hooks) WorkerConnection (trait) Termination = Finished | Cancelled | Failed | TransportFailed ``` ### Why are the changes needed? The previous `WorkerSession` tracked its lifecycle with a pair of `AtomicBoolean`s and signaled completion/cancellation by throwing, which does not cleanly express the several terminal outcomes a worker session can reach (clean finish, cancellation, user/worker error, transport failure) nor make worker reuse decisions explicit. It was also tied to a single Unix-socket transport. Reshaping it into an explicit state machine with a `Termination` result and abstract transport hooks gives a precise, testable lifecycle contract and lets a concrete transport (gRPC over UDS, in the follow-up) implement a small set of hooks rather than re-deriving lifecycle and termination handling. `WorkerHandle` and the cleanup-hook registry remove the core's dependence on a specific provisioning model. ### Does this PR introduce _any_ user-facing change? No. The UDF worker framework is experimental and currently unused by any released code path; this PR only reshapes its internal interfaces. ### How was this patch tested? Existing and adapted unit tests, run on top of `master`: - `build/sbt udf-worker-core/Test/compile` and `build/sbt sql/Test/compile` succeed. - `build/sbt "udf-worker-core/testOnly *DirectWorkerDispatcherSuite"` -- the dispatcher process-lifecycle suite (spawn / wait-for-ready / cleanup / error / timeout / concurrency) passes (31 tests) against the adapted in-core UDS test dispatcher. - `PythonUDFWorkerSpecificationSuite` (sql/core) is updated to the new `close()` signature and continues to spawn a real Python worker and verify reachability. ### Was this patch authored or co-authored using generative AI tooling? Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
