xianzhe-databricks commented on code in PR #55657:
URL: https://github.com/apache/spark/pull/55657#discussion_r3181112659


##########
udf/worker/proto/src/main/protobuf/udf_protocol.proto:
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "common.proto";
+
+package org.apache.spark.udf.worker;
+
+option java_package = "org.apache.spark.udf.worker";
+option java_multiple_files = true;
+
+// =====================================================================
+// Language-agnostic UDF execution protocol.
+//
+// The Spark engine acts as the gRPC client; a UDF worker (in any
+// language) acts as the gRPC server.
+// =====================================================================
+
+// The default UDF gRPC service. A worker that exposes this service
+// MUST do so over the default connection of the worker specification.
+//
+// In future, additional connections (e.g. a separate channel) may be
+// reserved by the worker spec for other purposes.
+service Worker {
+    // Per-execution stream. Exactly one [[Init]] is sent first, followed
+    // by 0..N data batches in either direction, terminated by exactly
+    // one [[Finish]] or [[Cancel]] from the engine. The worker MUST
+    // respond with the matching Init / Finish / Cancel responses on the
+    // response stream.
+    //
+    // For stateful execution, the state is maintained per bi-directional
+    // stream, mapping to a `WorkerSession` on the engine side
+    // (`org.apache.spark.udf.worker.core.WorkerSession`).
+    rpc Execute(stream UdfRequest) returns (stream UdfResponse);
+
+    // Worker-scoped management RPC, independent of any per-execution
+    // stream. Used for heartbeat, capability query, and graceful
+    // shutdown. Kept unary so it does not depend on the lifecycle of an
+    // active Execute stream.
+    rpc Manage(WorkerRequest) returns (WorkerResponse);
+}
+
+// =====================================================================
+// Execute stream -- envelope
+// =====================================================================
+
+// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]]
+// / [[Finish]] / [[Cancel]]) or a data message.
+message UdfRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof request {
+        UdfControlRequest control = 1;
+        DataRequest       data    = 2;
+    }
+}
+
+// Worker -> Engine. Either a control response ([[InitResponse]] /
+// [[FinishResponse]] / [[CancelResponse]]) or a data response message.
+message UdfResponse {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof response {
+        UdfControlResponse control = 1;
+        DataResponse       data    = 2;
+    }
+}
+
+// Engine -> Worker control messages.
+//
+// Wire order on an Execute stream is exactly:
+//   Init { ... }
+//   PayloadChunk { ... }*    // optional; 0..N chunks, only used when
+//                            // the single UDF payload on Init is too
+//                            // large to fit inline.
+//   ( DataRequest | <worker DataResponse> )*
+//   Finish { ... }    OR    Cancel { ... }    // exactly one terminator
+//
+// The worker MUST emit [[InitResponse]] before sending any
+// [[DataResponse]], and MUST emit exactly one [[FinishResponse]] or
+// [[CancelResponse]] before closing the response stream.
+//
+// A worker that receives messages out of this order (e.g. a second Init,
+// a PayloadChunk after the first DataRequest, a DataRequest before Init,
+// or a Cancel before Init) MUST close the stream with an error.
+message UdfControlRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof control {
+        Init         init    = 1;
+        PayloadChunk payload = 2;
+        Finish       finish  = 3;
+        Cancel       cancel  = 4;
+    }
+}
+
+// Worker -> Engine control messages.
+message UdfControlResponse {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof control {
+        InitResponse   init   = 1;
+        FinishResponse finish = 2;
+        CancelResponse cancel = 3;
+    }
+}
+
+// =====================================================================
+// Init phase
+// =====================================================================
+
+// Sent once, as the first message on an Execute stream. Describes
+// the UDF body to run plus the minimum metadata the worker needs to
+// start processing it.
+//
+// Today the protocol mandates exactly one Init per UDF execution
+// (one Init -> data -> Finish). This is the simplest contract and
+// covers all currently supported UDF kinds. In the future we may
+// evolve to support multiple init phases on the same stream -- e.g.
+// when worker setup requires an interactive handshake (negotiate a
+// schema, exchange capabilities, fetch driver-side metadata, ...)
+// before the data plane opens. Such an extension would be additive
+// and would not change the single-Init semantics already in use.
+//
+// Engine vs. client split:
+//   * Most fields on Init are engine-side. They describe what
+//     flows on the wire for this session ([[data_format]] /
+//     [[input_schema]] / [[output_schema]] -- matching the worker
+//     spec, not the function's view) and what per-session
+//     context the worker needs ([[timezone]], [[session_conf]],
+//     [[task_context]], [[parameters]]).
+//   * [[UdfPayload]] carries everything the client side of Spark
+//     (where the UDF is defined and serialized) packs -- the
+//     callable bytes themselves, plus optional custom encoders
+//     that override the worker's built-in decoders only when the
+//     UDF deals in types the worker doesn't already know how to
+//     convert (e.g. recovering Arrow batches into client-provided
+//     Scala case classes or other user-defined types).
+message Init {
+    // (Required) Wire format used for [[DataRequest.data]] and
+    // [[DataResponse.data]] for the life of this session. Must be
+    // one of the formats the worker declared in
+    // [[WorkerCapabilities.supported_data_formats]]; the client side
+    // of the protocol picks one at planning time and sticks with it.
+    // Receivers MUST reject an Init whose [[data_format]] is
+    // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`.
+    UDFWorkerDataFormat data_format = 1;
+
+    // (Required) The UDF body to execute on the worker for this
+    // session. Exactly one payload per Execute stream.
+    UdfPayload udf = 2;
+
+    // (Optional) Schema of the input data plane in the wire format
+    // declared by [[data_format]] -- e.g. an Arrow IPC schema when
+    // data_format = ARROW. This is an engine-side requirement: it
+    // describes the bytes the engine will actually put on
+    // [[DataRequest.data]] for this session, matching what the
+    // worker advertised in its spec. It is NOT necessarily the
+    // schema the function definer expressed; the UDF's own type
+    // information lives inside [[UdfPayload]] (embedded in the
+    // payload, or as a structured [[UdfPayload.input_encoder]]
+    // that converts wire bytes into language-native values).
+    //
+    // Left unset when the worker can derive the schema from the
+    // payload alone.
+    optional bytes input_schema = 3;
+
+    // (Optional) Schema of the output data plane in the wire format
+    // declared by [[data_format]]. Same semantics as
+    // [[input_schema]] -- engine-side requirement describing the
+    // bytes the engine expects on [[DataResponse.data]].
+    optional bytes output_schema = 4;
+
+    // (Optional; defaults to an empty map.) Per-task context
+    // provided by the engine. Common keys identify the task instance
+    // for diagnostics, logging, and stateful workers -- e.g.
+    // partition id, task attempt id, stage id, micro-batch id.
+    // Engine and worker agree on the keys they share; the protocol
+    // does not enumerate them.
+    map<string, string> task_context = 5;
+
+    // (Optional; defaults to an empty map.) Worker-private knobs not
+    // already captured by typed fields above. Free-form; both sides
+    // agree on the keys they need.
+    //
+    // Any key that two languages converge on is a candidate for
+    // promotion to a structured proto field -- once promoted, it gets
+    // a typed field number from the reserved range right after this
+    // block and is removed from [[session_conf]]. [[timezone]] below
+    // is an example of a key that has already been promoted.
+    map<string, string> session_conf = 6;
+
+    // (Optional) Session timezone, promoted out of [[session_conf]]
+    // because every eval needs it for timestamp encoding/decoding.
+    optional string timezone = 7;
+
+    // Reserved for future typed Init fields, in particular keys
+    // graduated from [[session_conf]] (see the [[timezone]] precedent
+    // above). Numbers >= 100 are intentionally NOT reserved here; if
+    // a future revision needs an opaque escape-hatch field, give it a
+    // number >= 100 alongside [[parameters]] and add a field-level
+    // comment so the convention stays visible.
+    reserved 8 to 99;
+
+    // (Optional) Engine-packed opaque parameters specific to a
+    // particular kind of UDF execution. The escape hatch for
+    // anything the engine needs the worker to see at init time
+    // that is not already captured by the typed fields above and
+    // does not fit naturally into [[task_context]]. The encoding
+    // is agreed between the engine and the worker; the protocol
+    // does not interpret it. The matching response, also opaque
+    // bytes, is returned via [[InitResponse.data]].
+    //
+    // Numbers >= 100 are reserved by convention for opaque
+    // escape-hatch fields like this one; new typed fields use the
+    // reserved 8..99 range.
+    //
+    // Client-side init data (anything packed by the layer that
+    // defines and serializes the UDF) does NOT belong here -- it
+    // travels inside [[UdfPayload.payload]] instead.
+    optional bytes parameters = 100;
+}
+
+// Acknowledgment for [[Init]]. The worker MUST send exactly one
+// [[InitResponse]] before any [[DataResponse]].
+//
+// The init phase allows the engine to interact with the UDF before
+// data starts flowing -- the worker can return inline bytes here for
+// the engine (or higher-level code on the engine side) to consume
+// during setup. The semantics of those bytes are agreed between the
+// client side of the protocol and the worker; this message itself is
+// otherwise opaque.
+message InitResponse {
+    // (Optional) Inline init result returned by the worker. Opaque
+    // to the protocol; the client side of the protocol and the
+    // worker agree on what (if anything) it carries.
+    optional bytes data = 1;
+}
+
+// Optional. Used to stream the single UDF payload when it does not
+// fit in a single gRPC message. The default is to send the payload
+// inline on [[UdfPayload.payload]]; chunking is only needed when a
+// payload exceeds the gRPC message size limit.
+//
+// When used, chunks are sent zero or more times after [[Init]] and
+// before the first [[DataRequest]]. The worker concatenates the
+// inline [[UdfPayload.payload]] (if any) followed by all chunks in
+// arrival order to form the final payload.
+message PayloadChunk {
+    // (Required, non-empty.) Bytes appended to the [[Init.udf]]
+    // payload.
+    bytes data = 1;
+
+    // (Optional) Set to true on the final chunk. Receivers MAY use
+    // this as an early signal that the payload is complete and
+    // decoding can begin; receivers that prefer to wait for the
+    // first [[DataRequest]] (which marks the end of the chunking
+    // phase) MAY ignore this. When unset, the receiver determines
+    // completeness by the arrival of the first [[DataRequest]].
+    optional bool last = 2;
+}
+
+// =====================================================================
+// Data phase
+//
+// `data` is intentionally a top-level `bytes` field on both request
+// and response messages -- not nested inside a wrapper -- so that
+// implementations can avoid an extra copy when reading or writing
+// the payload. The wire format (Arrow IPC etc.) is declared once per
+// session via [[Init.data_format]] and stays the same for the life
+// of the stream.
+// =====================================================================
+
+// Engine -> Worker per-batch payload.
+message DataRequest {
+    // (Required, non-empty.) Encoded data bytes for one batch in the
+    // session-declared format.
+    bytes data = 1;
+}
+
+// Worker -> Engine per-batch payload. The worker emits zero or more
+// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
+// [[CancelResponse]]. Sink-style UDFs (which consume input but
+// produce no output rows on the data plane) emit exactly zero.
+message DataResponse {
+    // (Required, non-empty.) Encoded data bytes for one batch in the
+    // session-declared format.
+    bytes data = 1;
+}
+
+// =====================================================================
+// Finish / Cancel phase
+// =====================================================================
+
+// Sent by the engine when no more input batches will arrive. The
+// worker MUST drain any remaining output, then emit
+// [[FinishResponse]] and close the response stream.
+//
+// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream;
+// they are mutually exclusive. If the engine has already sent
+// [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa).
+message Finish {}
+
+// Worker -> Engine completion message. May carry summary metrics.
+message FinishResponse {
+    // Final metrics aggregated over the whole session (e.g. rows
+    // in/out, time per phase). Free-form; names are worker-defined.
+    map<string, string> metrics = 1;
+
+    // (Optional) Inline finish result returned by the worker.
+    // Mirrors [[InitResponse.data]] -- the finish phase allows the
+    // engine to interact with the UDF after data has stopped
+    // flowing, with the worker returning opaque bytes the engine (or
+    // higher-level code) may consume during teardown. The semantics
+    // of those bytes are agreed between the client side of the
+    // protocol and the worker.
+    optional bytes data = 2;
+}
+
+// Engine -> Worker explicit cancel. Distinct from a gRPC stream error
+// so the worker can run cleanup deterministically (release file
+// handles, drop temp state, etc.). After receiving [[Cancel]] the
+// worker MUST stop emitting [[DataResponse]] messages, run cleanup,
+// and emit [[CancelResponse]] before closing.
+//
+// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream;
+// see [[Finish]]. [[Cancel]] is the cooperative cancellation path;
+// gRPC-level stream errors are the involuntary fallback. If the
+// stream breaks before [[CancelResponse]] arrives, the engine
+// considers the worker uncancellable for this session and relies on
+// process-level cleanup.
+message Cancel {
+    // (Optional) Free-form reason for diagnostics.
+    optional string reason = 1;
+}
+
+// Worker -> Engine acknowledgment of [[Cancel]].
+message CancelResponse {}
+
+// The single UDF body delivered to the worker on [[Init]]. Opaque to
+// the engine: the engine forwards [[payload]] and [[format]]
+// unchanged, and the worker decodes them per the format the client
+// and worker have agreed on.
+message UdfPayload {
+    // (Required, may be empty when chunked.) Serialized callable
+    // bundle, opaque to the engine. The encoding is declared in
+    // [[format]].
+    //
+    // For payloads too large to fit on a single gRPC message, this
+    // field MAY be left empty (zero-length bytes) and the bytes
+    // delivered via the [[PayloadChunk]] mechanism instead. See
+    // [[PayloadChunk]] for chunking semantics.
+    bytes payload = 1;
+
+    // (Required, non-empty.) Format tag identifying the encoding of
+    // [[payload]] (e.g. "py-cloudpickle-v3", "wasm-v1"). Engine does
+    // not interpret this; the client side of the protocol and the
+    // worker agree on its meaning.
+    string format = 2;
+
+    // (Optional) Total payload size in bytes. Useful when chunked
+    // streaming is used so the worker can pre-allocate buffers.
+    optional int64 payload_size = 3;
+
+    // (Optional) Human-readable name for diagnostics and metrics.
+    optional string name = 4;
+
+    // (Optional) Worker / language-specific dispatch hint. A
+    // free-form string the worker uses to pick the code path that
+    // handles this payload. The protocol does not enumerate eval
+    // types because they are language-specific; the client side of
+    // the protocol and the worker agree on the namespace and the
+    // values.
+    //
+    // When the worker can derive the eval type from the payload
+    // itself (embedded metadata, format tag, etc.), this field is
+    // left unset. Otherwise the client side of the protocol sets it
+    // explicitly.
+    optional string eval_type = 5;
+
+    // (Optional) Custom input encoder bytes. The worker already
+    // ships with built-in decoders for its standard types (e.g. a
+    // Python worker turns Arrow batches into pandas / pyarrow
+    // values out of the box; a JVM worker has its own defaults).
+    // Set this field only when the UDF needs a conversion the
+    // worker doesn't know about -- for example, recovering Arrow
+    // batches into client-provided Scala case classes, or any
+    // other user-defined type the function definer requires.
+    //
+    // Packed by the client side of the protocol; opaque to the
+    // wire protocol. Left unset whenever the worker's built-in
+    // decoders are sufficient.
+    optional bytes input_encoder = 6;
+
+    // (Optional) Custom output encoder bytes. Mirror of
+    // [[input_encoder]]: set only when the UDF produces values the
+    // worker cannot convert to [[DataResponse.data]] using its
+    // built-in encoders, and the client side of the protocol needs
+    // to ship the conversion alongside the UDF.
+    optional bytes output_encoder = 7;
+}
+
+// =====================================================================
+// Manage RPC -- worker-scoped operations independent of Execute
+// =====================================================================
+
+// Engine -> Worker. Wraps the manage operations in a oneof so the RPC
+// is a single typed call, leaving room for future operations
+// (capability query, profiling, ...).
+message WorkerRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof manage {
+        Heartbeat       heartbeat = 1;
+        ShutdownRequest shutdown  = 2;
+    }
+}
+
+// Worker -> Engine.
+message WorkerResponse {
+    // Exactly one branch MUST be set, mirroring the request oneof.
+    oneof manage {
+        HeartbeatAck     heartbeat = 1;
+        ShutdownResponse shutdown  = 2;
+    }
+}
+
+// Liveness probe. The engine may send this periodically to detect a
+// hung worker. The worker SHOULD reply within a small bounded time.
+message Heartbeat {}
+
+// Acknowledgment for [[Heartbeat]].
+message HeartbeatAck {}

Review Comment:
   ```suggestion
   message HeartbeatResponse {}
   ```
   
   just for some consistency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to