hvanhovell commented on code in PR #55657:
URL: https://github.com/apache/spark/pull/55657#discussion_r3235001349


##########
udf/worker/proto/src/main/protobuf/udf_protocol.proto:
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "common.proto";
+
+package org.apache.spark.udf.worker;
+
+option java_package = "org.apache.spark.udf.worker";
+option java_multiple_files = true;
+
+// =====================================================================
+// Language-agnostic UDF execution protocol.
+//
+// The Spark engine acts as the gRPC client; a UDF worker (in any
+// language) acts as the gRPC server.
+// =====================================================================
+
+// The default UDF gRPC service. A worker that exposes this service
+// MUST do so over the default connection of the worker specification.
+//
+// In future, additional connections (e.g. a separate channel) may be
+// reserved by the worker spec for other purposes.
+service UdfWorker {
+    // Per-execution stream. See [[UdfControlRequest]] for the complete
+    // wire protocol and ordering invariants.
+    //
+    // Error contract: if the gRPC connection breaks at any point, gRPC
+    // surfaces an error on the stream. The engine therefore never needs
+    // to poll or time out waiting for a response -- the absence of a
+    // gRPC error guarantees that a proper protocol response will
+    // eventually arrive. This applies to every in-flight response, not
+    // only [[CancelResponse]].
+    //
+    // Stream lifecycle: the engine MUST half-close the request stream
+    // (call onCompleted() on the gRPC stream) after the session
+    // terminates: on receiving [[FinishResponse]] or [[CancelResponse]]
+    // (clean termination) or on receiving a gRPC error (connection
+    // broke). Deferring the half-close until the outcome is known keeps
+    // the request stream open long enough for [[Cancel]] to follow
+    // [[Finish]] if needed.
+    //
+    // Response observer threading: gRPC does not permit concurrent calls
+    // to the response StreamObserver. Worker implementations that dispatch
+    // processing to a thread pool MUST serialize all writes to the response
+    // observer.
+    //
+    // For stateful execution, the state is maintained per bi-directional
+    // stream, mapping to a `WorkerSession` on the engine side.
+    rpc Execute(stream UdfRequest) returns (stream UdfResponse);
+
+    // Worker-scoped management RPC, independent of any per-execution
+    // stream. Used for heartbeat, capability query, and graceful
+    // shutdown. Kept unary so it does not depend on the lifecycle of an
+    // active Execute stream.
+    rpc Manage(WorkerRequest) returns (WorkerResponse);
+}
+
+// =====================================================================
+// Execute stream -- envelope
+// =====================================================================
+
+// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]]
+// / [[Finish]] / [[Cancel]]) or a data message.
+message UdfRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof request {
+        UdfControlRequest control = 1;
+        DataRequest       data    = 2;
+    }
+}
+
+// Worker -> Engine. Either a control response ([[InitResponse]] /
+// [[FinishResponse]] / [[CancelResponse]] / [[ExecutionError]]) or a
+// data response message.
+message UdfResponse {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof response {
+        UdfControlResponse control = 1;
+        DataResponse       data    = 2;
+    }
+}
+
+// Engine -> Worker control messages.
+//
+// Wire protocol for one Execute stream (both directions):
+//
+//   Engine -> Worker:  Init -> PayloadChunk* -> (DataRequest)* -> Finish 
(Cancel)?
+//                                                               | Cancel
+//   Worker -> Engine:          InitResponse  -> (DataResponse)* -> 
(ExecutionError)? -> (FinishResponse | CancelResponse)
+//
+// DataRequest and DataResponse are independent streams: the worker
+// may emit DataResponse messages at any point after InitResponse,
+// including before the first DataRequest arrives. For generator-style
+// UDFs that produce output without consuming input, there may be zero
+// DataRequest messages -- the engine sends Finish directly after Init.
+// The arrows above denote ordering constraints within each direction,
+// not a request/response pairing.
+//
+// The engine MAY send DataRequests before receiving InitResponse (pipeline
+// mode). The worker MUST buffer such DataRequests and process them in
+// arrival order once init succeeds. They MAY be discarded only if init
+// fails (i.e. the worker sends ExecutionError before InitResponse).
+//
+// Ordering invariants:
+//   - PayloadChunk* only after Init and before the first DataRequest.
+//     [[Init.is_chunking_payload]] = true signals that chunks will follow;
+//     [[PayloadChunk.last]] = true is the canonical end-of-chunking signal.
+//     When [[Init.is_chunking_payload]] is false or absent, [[InitResponse]]
+//     MAY be sent immediately after [[Init]] without waiting for chunks.
+//   - InitResponse MUST be emitted before any DataResponse.
+//   - ExecutionError (if any) MUST be emitted after all DataResponse
+//     messages. After sending it the worker MUST stop processing DataRequests
+//     and wait for the engine to send Finish or Cancel, then respond with
+//     FinishResponse or CancelResponse accordingly. At most one
+//     ExecutionError is sent per stream; the worker aggregates multiple
+//     errors internally.
+//   - The engine terminates with one of:
+//     (a) Finish alone        -> worker sends FinishResponse.
+//     (b) Cancel alone        -> worker sends CancelResponse.
+//     (c) Finish then Cancel  -> worker sends CancelResponse if it has not
+//         yet sent FinishResponse, otherwise FinishResponse (see [[Finish]]).
+//     Cancel MUST NOT precede Finish on the same stream.
+//
+// A worker that receives messages out of order (e.g. a second Init,
+// a PayloadChunk after the first DataRequest, a DataRequest before Init,
+// or a Cancel before Init) MUST send [[ExecutionError]] with a
+// [[ProtocolError]] kind, followed by [[FinishResponse]] or
+// [[CancelResponse]] to close the stream cleanly.
+message UdfControlRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof control {
+        Init         init    = 1;
+        PayloadChunk payload = 2;
+        Finish       finish  = 3;
+        Cancel       cancel  = 4;
+    }
+}
+
+// Worker -> Engine control messages.
+message UdfControlResponse {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof control {
+        InitResponse   init   = 1;
+        FinishResponse finish = 2;
+        CancelResponse cancel = 3;
+        ExecutionError error  = 4;
+    }
+}
+
+// =====================================================================
+// Init phase
+// =====================================================================
+
+// Sent once, as the first message on an Execute stream. Describes
+// the UDF body to run plus the minimum metadata the worker needs to
+// start processing it.
+//
+// Today the protocol mandates exactly one Init per UDF execution
+// (one Init -> data -> Finish). This is the simplest contract and
+// covers all currently supported UDF kinds. In the future we may
+// evolve to support multiple init phases on the same stream -- e.g.
+// when worker setup requires an interactive handshake (negotiate a
+// schema, exchange capabilities, fetch driver-side metadata, ...)
+// before the data plane opens. Such an extension would be additive
+// and would not change the single-Init semantics already in use.
+//
+// Engine vs. client split:
+//   * Most fields on Init are engine-side. They describe what
+//     flows on the wire for this session ([[data_format]] /
+//     [[input_schema]] / [[output_schema]] -- matching the worker
+//     spec, not the function's view) and what per-session
+//     context the worker needs ([[timezone]], [[session_conf]],
+//     [[task_context]], [[parameters]]).
+//   * [[UdfPayload]] carries everything the client side of Spark
+//     (where the UDF is defined and serialized) packs -- the
+//     serialized callable, an opaque format tag, and any encoder
+//     metadata bundled with the callable. The wire protocol does
+//     not enumerate encoder shapes; that is left to the client and
+//     worker to agree on per UDF type.
+message Init {
+    // (Optional) Protocol version declared by the engine for this stream.
+    // Allows the worker to detect version mismatches early and reject
+    // streams using a protocol revision it does not support. When not set,
+    // the worker SHOULD assume the initial protocol version.
+    optional uint32 protocol_version = 1;
+
+    // (Required) Wire format used for [[DataRequest.data]] and
+    // [[DataResponse.data]] for the life of this session. Must be
+    // one of the formats the worker declared in
+    // [[WorkerCapabilities.supported_data_formats]]; the client side
+    // of the protocol picks one at planning time and sticks with it.
+    //
+    // Workers MUST reject an [[Init]] whose [[data_format]] is
+    // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`, or whose value is not
+    // present in their declared
+    // [[WorkerCapabilities.supported_data_formats]]. The latter covers
+    // unknown enum values that proto3 passes through as numeric
+    // constants -- e.g. a newer engine selecting a format the worker
+    // does not implement.
+    UDFWorkerDataFormat data_format = 2;
+
+    // (Required) The UDF body to execute on the worker for this
+    // session. Exactly one payload per Execute stream.
+    UdfPayload udf = 3;
+
+    // (Optional) Schema of the input data plane in the wire format
+    // declared by [[data_format]] -- e.g. an Arrow IPC schema when
+    // data_format = ARROW. This is an engine-side requirement: it
+    // describes the bytes the engine will actually put on
+    // [[DataRequest.data]] for this session, matching what the
+    // worker advertised in its spec. It is NOT necessarily the
+    // schema the function definer expressed; the UDF's own type
+    // information lives inside [[UdfPayload]], typically embedded
+    // alongside the callable in [[UdfPayload.payload]] (e.g. as
+    // input/output encoders chosen per UDF type).
+    //
+    // Left unset when the worker can derive the schema from the
+    // payload alone.
+    optional bytes input_schema = 4;
+
+    // (Optional) Schema of the output data plane in the wire format
+    // declared by [[data_format]]. Same semantics as
+    // [[input_schema]] -- engine-side requirement describing the
+    // bytes the engine expects on [[DataResponse.data]].
+    optional bytes output_schema = 5;
+
+    // (Optional; defaults to an empty map.) Per-task context
+    // provided by the engine. Common keys identify the task instance
+    // for diagnostics, logging, and stateful workers -- e.g.
+    // partition id, task attempt id, stage id, micro-batch id.
+    // Engine and worker agree on the keys they share; the protocol
+    // does not enumerate them.
+    map<string, string> task_context = 6;
+
+    // (Optional; defaults to an empty map.) Worker-private knobs not
+    // already captured by typed fields above. Free-form; both sides
+    // agree on the keys they need.
+    //
+    // Any key that two languages converge on is a candidate for
+    // promotion to a structured proto field -- once promoted, it gets
+    // a typed field number from the reserved range right after this
+    // block and is removed from [[session_conf]]. [[timezone]] below
+    // is an example of a key that has already been promoted.
+    map<string, string> session_conf = 7;
+
+    // (Optional) Session timezone, promoted out of [[session_conf]]
+    // because every eval needs it for timestamp encoding/decoding.
+    //
+    // Format follows Spark's `spark.sql.session.timeZone` config --
+    // typically an IANA TZ id (e.g. "America/Los_Angeles") or a
+    // fixed offset (e.g. "+08:00"). The engine MUST pass the value
+    // it would resolve from the session conf without further
+    // transformation, so the worker can interpret it the same way
+    // Spark does.
+    optional string timezone = 8;
+
+    // (Optional) When true, the UDF payload will be delivered via
+    // [[PayloadChunk]] messages rather than inline in [[UdfPayload.payload]].
+    // The worker MUST wait for [[PayloadChunk.last]] = true before sending
+    // [[InitResponse]]. When false or absent, the payload is fully contained
+    // in [[UdfPayload.payload]] and the worker MAY send [[InitResponse]]
+    // immediately after [[Init]] without waiting for any [[PayloadChunk]].
+    optional bool is_chunking_payload = 9;
+
+    // Reserved for future typed Init fields, in particular keys
+    // graduated from [[session_conf]] (see the [[timezone]] precedent
+    // above). Numbers >= 100 are intentionally NOT reserved here; if
+    // a future revision needs an opaque escape-hatch field, give it a
+    // number >= 100 alongside [[parameters]] and add a field-level
+    // comment so the convention stays visible.
+    reserved 10 to 99;
+
+    // (Optional) Engine-packed opaque parameters specific to a
+    // particular kind of UDF execution. The escape hatch for
+    // anything the engine needs the worker to see at init time
+    // that is not already captured by the typed fields above and
+    // does not fit naturally into [[task_context]]. The encoding
+    // is agreed between the engine and the worker; the protocol
+    // does not interpret it. The matching response, also opaque
+    // bytes, is returned via [[InitResponse.data]].
+    //
+    // Numbers >= 100 are reserved by convention for opaque
+    // escape-hatch fields like this one; new typed fields use the
+    // reserved 10..99 range.
+    //
+    // Client-side init data (anything packed by the layer that
+    // defines and serializes the UDF) does NOT belong here -- it
+    // travels inside [[UdfPayload.payload]] instead.
+    optional bytes parameters = 100;
+}
+
+// Acknowledgment for [[Init]]. The worker MUST send exactly one
+// [[InitResponse]] before any [[DataResponse]]. When [[PayloadChunk]]
+// is used to deliver the UDF payload, the worker MUST also wait until
+// end-of-chunking to emit it (see [[PayloadChunk]]).
+//
+// The init phase allows the engine to interact with the UDF before
+// data starts flowing -- the worker can return inline bytes here for
+// the engine (or higher-level code on the engine side) to consume
+// during setup. The semantics of those bytes are agreed between the
+// client side of the protocol and the worker; this message itself is
+// otherwise opaque.
+message InitResponse {
+    // (Optional) Inline init result returned by the worker. Opaque
+    // to the protocol; the client side of the protocol and the
+    // worker agree on what (if anything) it carries.
+    optional bytes data = 1;
+}
+
+// Optional. Used to stream the single UDF payload when it does not
+// fit in a single gRPC message. The default is to send the payload
+// inline on [[UdfPayload.payload]]; chunking is only needed when a
+// payload exceeds the gRPC message size limit.
+//
+// When used, chunks are sent zero or more times after [[Init]] and
+// before the first [[DataRequest]]. The worker concatenates the
+// inline [[UdfPayload.payload]] (if any) followed by all chunks in
+// arrival order to form the final payload.
+//
+// Chunks are part of the Init handshake, not standalone control
+// messages: they extend [[Init.udf.payload]] and are not
+// individually acknowledged. The single [[InitResponse]] covers
+// Init plus all of its chunks together. [[PayloadChunk.last]] = true
+// is the canonical end-of-chunking signal; the worker MUST NOT send
+// [[InitResponse]] before receiving it.
+//
+// When [[UdfPayload.payload_size]] is set on [[Init.udf]], receivers
+// MAY validate that the total assembled payload (inline
+// [[UdfPayload.payload]] bytes plus all chunk bytes) matches it; a
+// mismatch is a protocol error.
+message PayloadChunk {
+    // (Required, non-empty.) Bytes appended to the [[Init.udf]]
+    // payload.
+    bytes data = 1;
+
+    // Marks the final chunk. When the engine uses [[PayloadChunk]]
+    // at all, it MUST set `last = true` on the last chunk. This is
+    // the canonical end-of-chunking signal: the worker MUST wait for
+    // it before emitting [[InitResponse]] and before treating any
+    // subsequent message as a [[DataRequest]]. Non-final chunks omit
+    // this field.
+    //
+    // Kept `optional` so future revisions can distinguish "engine did
+    // not set this field" from "engine set false" without renumbering.
+    optional bool last = 2;
+}
+
+// =====================================================================
+// Data phase
+//
+// `data` is intentionally a top-level `bytes` field on both request
+// and response messages -- not nested inside a wrapper -- so that
+// implementations can avoid an extra copy when reading or writing
+// the payload. The wire format (Arrow IPC etc.) is declared once per
+// session via [[Init.data_format]] and stays the same for the life
+// of the stream.
+//
+// Backpressure: this protocol currently relies on gRPC's transport-level
+// (HTTP/2) flow control for backpressure. Application-level backpressure
+// is not yet defined and may be introduced in a future revision.
+// =====================================================================
+
+// Engine -> Worker per-batch payload.
+message DataRequest {
+    // (Required) Encoded data bytes for one batch in the
+    // session-declared format. What "empty" means for a batch is
+    // defined by the session's [[Init.data_format]] -- for Arrow IPC
+    // even a zero-row batch carries a non-empty header, while future
+    // formats may permit truly zero-length payloads. Validation
+    // beyond "non-empty bytes" is delegated to the format decoder.
+    bytes data = 1;
+}
+
+// Worker -> Engine per-batch payload. The worker emits zero or more
+// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
+// [[CancelResponse]]. Sink-style UDFs (which consume input but
+// produce no output rows on the data plane) emit exactly zero.
+message DataResponse {
+    // (Required) Encoded data bytes for one batch in the
+    // session-declared format. See [[DataRequest.data]] for the
+    // meaning of "empty".
+    bytes data = 1;
+}
+
+// =====================================================================
+// Finish / Cancel phase
+// =====================================================================
+
+// Sent by the engine when all input data has been submitted and normal
+// completion is expected. The worker MUST drain any remaining output,
+// then emit [[FinishResponse]] and close the response stream.
+//
+// [[Cancel]] MAY follow [[Finish]] on the same stream if the engine
+// wants to abort processing of already-submitted data (e.g. a Spark
+// task is interrupted after all input batches were sent). [[Cancel]]
+// MUST NOT precede [[Finish]]; if the engine cancels before sending
+// all data it sends [[Cancel]] without [[Finish]].
+//
+// Worker behavior when [[Cancel]] follows [[Finish]]:
+//   - If [[FinishResponse]] has not yet been sent, the worker MUST
+//     abort output, run cleanup, and send [[CancelResponse]].
+//   - If [[FinishResponse]] has already been sent, [[Cancel]] arrives
+//     too late and is ignored; the engine receives [[FinishResponse]].
+// The engine MUST therefore be prepared to receive either
+// [[FinishResponse]] or [[CancelResponse]] when it sends both.
+message Finish {}
+
+// Worker -> Engine completion message. May carry summary metrics.
+message FinishResponse {
+    // Final metrics aggregated over the whole session (e.g. rows
+    // in/out, time per phase). Free-form; names are worker-defined.
+    map<string, string> metrics = 1;

Review Comment:
   I am very much in favor of metrics. I am just wondering if this is the right 
place to emit them. I guess folks might more realtime metrics for example. 
Perhaps put them in a separate message? Another question would be if metrics 
should be part of the UDF execution flow or the maintenance rpc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to