haiyangsun-db opened a new pull request, #56397:
URL: https://github.com/apache/spark/pull/56397

   ### What changes were proposed in this pull request?
   
   This PR refactors the experimental UDF worker **session interface** in the 
`udf-worker-core` module into an explicit state machine with well-defined 
terminal outcomes, so that a concrete transport backend can plug in cleanly. 
(The first such backend -- a gRPC-over-UDS dispatcher -- lands in a follow-up 
PR.)
   
   Core interface changes:
   
   - **`WorkerSession`** becomes a single `AtomicReference`-driven state 
machine (`Created -> Initializing -> Initialized -> Streaming -> Finishing`, 
with `Cancelling`/`Finished`/`Cancelled`/`Failed`/`TransportFailed` terminals). 
`init` now returns an `InitResponse`, `process` takes a `finish` thunk, and 
`close` returns a `Termination` instead of throwing; the previous separate 
`cancel()` is folded into `close()`. Subclasses implement `doInit` / 
`doProcess` / `doClose` hooks and drive transitions via protected helpers.
   - **`Termination`** (new) is a sealed trait modeling the four terminal 
outcomes: `Finished(FinishResponse)`, `Cancelled(CancelResponse)`, 
`Failed(ExecutionError)`, and `TransportFailed(Throwable)`.
   - **`WorkerHandle`** (new) is a small trait that decouples a session from 
the concrete worker-provisioning model. `DirectWorkerProcess` implements it and 
gains a cleanup-hook registry, so a dispatcher can register transport-specific 
cleanup (e.g. deleting a socket file) without the core hard-coding it.
   - **`DirectWorkerDispatcher`** is generalized from a Unix-socket-specific 
class into abstract transport hooks (`newEndpointAddress` / `waitForReady` / 
`cleanupEndpointAddress` / `closeTransport` / `validateTransportSupport` / 
`newConnection` / `newSession` / `initialize`). The concrete Unix-socket 
dispatcher and session (`DirectUnixSocketWorkerDispatcher`, 
`DirectWorkerSession`) are removed -- the first concrete backend is provided by 
the follow-up gRPC change.
   - **`WorkerConnection`** becomes a trait.
   
   Caller/test changes:
   
   - `ExternalUDFExec` (sql/core) is updated to the new `close()` / 
`Termination` finalizer: a single task-completion listener now handles both the 
success path (a `FinishResponse` has already arrived) and the 
failure/early-stop path (send `Cancel`, await `CancelResponse`), replacing the 
previous separate cancel-on-failure listener.
   - The in-core test scaffolding (`TestDirectWorkerHelpers`, 
`DirectWorkerDispatcherSuite`) is adapted to the new API as a small UDS-backed 
test dispatcher, so `udf-worker-core` and `sql/core` stay green. This 
scaffolding moves to the `udf-worker-grpc` module alongside the concrete gRPC 
dispatcher in the follow-up PR.
   
   Class hierarchy after this change:
   
   ```
   WorkerSession (abstract state machine; close(): Termination)
   WorkerHandle  (trait)  <- DirectWorkerProcess
   DirectWorkerDispatcher (abstract transport hooks)
   WorkerConnection (trait)
   Termination = Finished | Cancelled | Failed | TransportFailed
   ```
   
   ### Why are the changes needed?
   
   The previous `WorkerSession` tracked its lifecycle with a pair of 
`AtomicBoolean`s and signaled completion/cancellation by throwing, which does 
not cleanly express the several terminal outcomes a worker session can reach 
(clean finish, cancellation, user/worker error, transport failure) nor make 
worker reuse decisions explicit. It was also tied to a single Unix-socket 
transport. Reshaping it into an explicit state machine with a `Termination` 
result and abstract transport hooks gives a precise, testable lifecycle 
contract and lets a concrete transport (gRPC over UDS, in the follow-up) 
implement a small set of hooks rather than re-deriving lifecycle and 
termination handling. `WorkerHandle` and the cleanup-hook registry remove the 
core's dependence on a specific provisioning model.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. The UDF worker framework is experimental and currently unused by any 
released code path; this PR only reshapes its internal interfaces.
   
   ### How was this patch tested?
   
   Existing and adapted unit tests, run on top of `master`:
   
   - `build/sbt udf-worker-core/Test/compile` and `build/sbt sql/Test/compile` 
succeed.
   - `build/sbt "udf-worker-core/testOnly *DirectWorkerDispatcherSuite"` -- the 
dispatcher process-lifecycle suite (spawn / wait-for-ready / cleanup / error / 
timeout / concurrency) passes (31 tests) against the adapted in-core UDS test 
dispatcher.
   - `PythonUDFWorkerSpecificationSuite` (sql/core) is updated to the new 
`close()` signature and continues to spawn a real Python worker and verify 
reachability.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to