haiyangsun-db commented on code in PR #55657:
URL: https://github.com/apache/spark/pull/55657#discussion_r3236822298


##########
udf/worker/proto/src/main/protobuf/udf_protocol.proto:
##########
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "common.proto";
+
+package org.apache.spark.udf.worker;
+
+option java_package = "org.apache.spark.udf.worker";
+option java_multiple_files = true;
+
+// =====================================================================
+// Language-agnostic UDF execution protocol.
+//
+// The Spark engine acts as the gRPC client; a UDF worker (in any
+// language) acts as the gRPC server.
+// =====================================================================
+
+// The default UDF gRPC service. A worker that exposes this service
+// MUST do so over the default connection of the worker specification.
+//
+// In future, additional connections (e.g. a separate channel) may be
+// reserved by the worker spec for other purposes.
+service UdfWorker {
+    // Per-execution stream. See [[UdfControlRequest]] for the complete
+    // wire protocol and ordering invariants.
+    //
+    // Error contract: if the gRPC connection breaks at any point, gRPC
+    // surfaces an error on the stream. The engine therefore never needs
+    // to poll or time out waiting for a response -- the absence of a
+    // gRPC error guarantees that a proper protocol response will
+    // eventually arrive. This applies to every in-flight response, not
+    // only [[CancelResponse]].
+    //
+    // Stream lifecycle: the engine MUST half-close the request stream
+    // (call onCompleted() on the gRPC stream) after the session
+    // terminates: on receiving [[FinishResponse]] or [[CancelResponse]]
+    // (clean termination) or on receiving a gRPC error (connection
+    // broke). Deferring the half-close until the outcome is known keeps
+    // the request stream open long enough for [[Cancel]] to follow
+    // [[Finish]] if needed.
+    //
+    // Response observer threading: gRPC does not permit concurrent calls
+    // to the response StreamObserver. Worker implementations that dispatch
+    // processing to a thread pool MUST serialize all writes to the response
+    // observer.
+    //
+    // For stateful execution, the state is maintained per bi-directional
+    // stream, mapping to a `WorkerSession` on the engine side.
+    rpc Execute(stream UdfRequest) returns (stream UdfResponse);
+
+    // Worker-scoped management RPC, independent of any per-execution
+    // stream. Used for heartbeat, capability query, and graceful
+    // shutdown. Kept unary so it does not depend on the lifecycle of an
+    // active Execute stream.
+    rpc Manage(WorkerRequest) returns (WorkerResponse);
+}
+
+// =====================================================================
+// Execute stream -- envelope
+// =====================================================================
+
+// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]]
+// / [[Finish]] / [[Cancel]]) or a data message.
+message UdfRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof request {
+        UdfControlRequest control = 1;
+        DataRequest       data    = 2;
+    }
+}
+
+// Worker -> Engine. Either a control response ([[InitResponse]] /
+// [[FinishResponse]] / [[CancelResponse]] / [[ExecutionError]]) or a
+// data response message.
+message UdfResponse {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof response {
+        UdfControlResponse control = 1;
+        DataResponse       data    = 2;
+    }
+}
+
+// Engine -> Worker control messages.
+//
+// Wire protocol for one Execute stream (both directions):
+//
+//   Engine -> Worker:  Init -> PayloadChunk* -> (DataRequest)* -> Finish 
(Cancel)?
+//                                                               | Cancel
+//   Worker -> Engine:          InitResponse  -> (DataResponse)* -> 
(ExecutionError)? -> (FinishResponse | CancelResponse)
+//
+// DataRequest and DataResponse are independent streams: the worker
+// may emit DataResponse messages at any point after InitResponse,
+// including before the first DataRequest arrives. For generator-style
+// UDFs that produce output without consuming input, there may be zero
+// DataRequest messages -- the engine sends Finish directly after Init.
+// The arrows above denote ordering constraints within each direction,
+// not a request/response pairing.
+//
+// The engine MAY send DataRequests before receiving InitResponse (pipeline
+// mode). The worker MUST buffer such DataRequests and process them in
+// arrival order once init succeeds. They MAY be discarded only if init
+// fails (i.e. the worker sends ExecutionError before InitResponse).
+//
+// Ordering invariants:
+//   - PayloadChunk* only after Init and before the first DataRequest.
+//     [[Init.is_chunking_payload]] = true signals that chunks will follow;
+//     [[PayloadChunk.last]] = true is the canonical end-of-chunking signal.
+//     When [[Init.is_chunking_payload]] is false or absent, [[InitResponse]]
+//     MAY be sent immediately after [[Init]] without waiting for chunks.
+//   - InitResponse MUST be emitted before any DataResponse.
+//   - ExecutionError (if any) MUST be emitted after all DataResponse
+//     messages. After sending it the worker MUST stop processing DataRequests
+//     and wait for the engine to send Finish or Cancel, then respond with
+//     FinishResponse or CancelResponse accordingly. At most one
+//     ExecutionError is sent per stream; the worker aggregates multiple
+//     errors internally.
+//   - The engine terminates with one of:
+//     (a) Finish alone        -> worker sends FinishResponse.
+//     (b) Cancel alone        -> worker sends CancelResponse.
+//     (c) Finish then Cancel  -> worker sends CancelResponse if it has not
+//         yet sent FinishResponse, otherwise FinishResponse (see [[Finish]]).
+//     Cancel MUST NOT precede Finish on the same stream.
+//
+// A worker that receives messages out of order (e.g. a second Init,
+// a PayloadChunk after the first DataRequest, a DataRequest before Init,
+// or a Cancel before Init) MUST send [[ExecutionError]] with a
+// [[ProtocolError]] kind, followed by [[FinishResponse]] or
+// [[CancelResponse]] to close the stream cleanly.
+message UdfControlRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof control {
+        Init         init    = 1;
+        PayloadChunk payload = 2;
+        Finish       finish  = 3;
+        Cancel       cancel  = 4;
+    }
+}
+
+// Worker -> Engine control messages.
+message UdfControlResponse {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof control {
+        InitResponse   init   = 1;
+        FinishResponse finish = 2;
+        CancelResponse cancel = 3;
+        ExecutionError error  = 4;
+    }
+}
+
+// =====================================================================
+// Init phase
+// =====================================================================
+
+// Sent once, as the first message on an Execute stream. Describes
+// the UDF body to run plus the minimum metadata the worker needs to
+// start processing it.
+//
+// Today the protocol mandates exactly one Init per UDF execution
+// (one Init -> data -> Finish). This is the simplest contract and
+// covers all currently supported UDF kinds. In the future we may
+// evolve to support multiple init phases on the same stream -- e.g.
+// when worker setup requires an interactive handshake (negotiate a
+// schema, exchange capabilities, fetch driver-side metadata, ...)
+// before the data plane opens. Such an extension would be additive
+// and would not change the single-Init semantics already in use.
+//
+// Engine vs. client split:
+//   * Most fields on Init are engine-side. They describe what
+//     flows on the wire for this session ([[data_format]] /
+//     [[input_schema]] / [[output_schema]] -- matching the worker
+//     spec, not the function's view) and what per-session
+//     context the worker needs ([[timezone]], [[session_conf]],
+//     [[task_context]], [[parameters]]).
+//   * [[UdfPayload]] carries everything the client side of Spark
+//     (where the UDF is defined and serialized) packs -- the
+//     serialized callable, an opaque format tag, and any encoder
+//     metadata bundled with the callable. The wire protocol does
+//     not enumerate encoder shapes; that is left to the client and
+//     worker to agree on per UDF type.
+message Init {
+    // (Optional) Protocol version declared by the engine for this stream.
+    // Allows the worker to detect version mismatches early and reject
+    // streams using a protocol revision it does not support. When not set,
+    // the worker SHOULD assume the initial protocol version.
+    optional uint32 protocol_version = 1;
+
+    // (Required) Wire format used for [[DataRequest.data]] and
+    // [[DataResponse.data]] for the life of this session. Must be
+    // one of the formats the worker declared in
+    // [[WorkerCapabilities.supported_data_formats]]; the client side
+    // of the protocol picks one at planning time and sticks with it.
+    //
+    // Workers MUST reject an [[Init]] whose [[data_format]] is
+    // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`, or whose value is not
+    // present in their declared
+    // [[WorkerCapabilities.supported_data_formats]]. The latter covers
+    // unknown enum values that proto3 passes through as numeric
+    // constants -- e.g. a newer engine selecting a format the worker
+    // does not implement.
+    UDFWorkerDataFormat data_format = 2;
+
+    // (Required) The UDF body to execute on the worker for this
+    // session. Exactly one payload per Execute stream.
+    UdfPayload udf = 3;
+
+    // (Optional) Schema of the input data plane in the wire format
+    // declared by [[data_format]] -- e.g. an Arrow IPC schema when
+    // data_format = ARROW. This is an engine-side requirement: it
+    // describes the bytes the engine will actually put on
+    // [[DataRequest.data]] for this session, matching what the
+    // worker advertised in its spec. It is NOT necessarily the
+    // schema the function definer expressed; the UDF's own type
+    // information lives inside [[UdfPayload]], typically embedded
+    // alongside the callable in [[UdfPayload.payload]] (e.g. as
+    // input/output encoders chosen per UDF type).
+    //
+    // Left unset when the worker can derive the schema from the
+    // payload alone.
+    optional bytes input_schema = 4;
+
+    // (Optional) Schema of the output data plane in the wire format
+    // declared by [[data_format]]. Same semantics as
+    // [[input_schema]] -- engine-side requirement describing the
+    // bytes the engine expects on [[DataResponse.data]].
+    optional bytes output_schema = 5;
+
+    // (Optional; defaults to an empty map.) Per-task context
+    // provided by the engine. Common keys identify the task instance
+    // for diagnostics, logging, and stateful workers -- e.g.
+    // partition id, task attempt id, stage id, micro-batch id.
+    // Engine and worker agree on the keys they share; the protocol
+    // does not enumerate them.
+    map<string, string> task_context = 6;
+
+    // (Optional; defaults to an empty map.) Worker-private knobs not
+    // already captured by typed fields above. Free-form; both sides
+    // agree on the keys they need.
+    //
+    // Any key that two languages converge on is a candidate for
+    // promotion to a structured proto field -- once promoted, it gets
+    // a typed field number from the reserved range right after this
+    // block and is removed from [[session_conf]]. [[timezone]] below
+    // is an example of a key that has already been promoted.
+    map<string, string> session_conf = 7;
+
+    // (Optional) Session timezone, promoted out of [[session_conf]]
+    // because every eval needs it for timestamp encoding/decoding.
+    //
+    // Format follows Spark's `spark.sql.session.timeZone` config --
+    // typically an IANA TZ id (e.g. "America/Los_Angeles") or a
+    // fixed offset (e.g. "+08:00"). The engine MUST pass the value
+    // it would resolve from the session conf without further
+    // transformation, so the worker can interpret it the same way
+    // Spark does.
+    optional string timezone = 8;
+
+    // (Optional) When true, the UDF payload will be delivered via
+    // [[PayloadChunk]] messages rather than inline in [[UdfPayload.payload]].
+    // The worker MUST wait for [[PayloadChunk.last]] = true before sending
+    // [[InitResponse]]. When false or absent, the payload is fully contained
+    // in [[UdfPayload.payload]] and the worker MAY send [[InitResponse]]
+    // immediately after [[Init]] without waiting for any [[PayloadChunk]].
+    optional bool is_chunking_payload = 9;
+
+    // Reserved for future typed Init fields, in particular keys
+    // graduated from [[session_conf]] (see the [[timezone]] precedent
+    // above). Numbers >= 100 are intentionally NOT reserved here; if
+    // a future revision needs an opaque escape-hatch field, give it a
+    // number >= 100 alongside [[parameters]] and add a field-level
+    // comment so the convention stays visible.
+    reserved 10 to 99;
+
+    // (Optional) Engine-packed opaque parameters specific to a
+    // particular kind of UDF execution. The escape hatch for
+    // anything the engine needs the worker to see at init time
+    // that is not already captured by the typed fields above and
+    // does not fit naturally into [[task_context]]. The encoding
+    // is agreed between the engine and the worker; the protocol
+    // does not interpret it. The matching response, also opaque
+    // bytes, is returned via [[InitResponse.data]].
+    //
+    // Numbers >= 100 are reserved by convention for opaque
+    // escape-hatch fields like this one; new typed fields use the
+    // reserved 10..99 range.
+    //
+    // Client-side init data (anything packed by the layer that
+    // defines and serializes the UDF) does NOT belong here -- it
+    // travels inside [[UdfPayload.payload]] instead.
+    optional bytes parameters = 100;
+}
+
+// Acknowledgment for [[Init]]. The worker MUST send exactly one
+// [[InitResponse]] before any [[DataResponse]]. When [[PayloadChunk]]
+// is used to deliver the UDF payload, the worker MUST also wait until
+// end-of-chunking to emit it (see [[PayloadChunk]]).
+//
+// The init phase allows the engine to interact with the UDF before
+// data starts flowing -- the worker can return inline bytes here for
+// the engine (or higher-level code on the engine side) to consume
+// during setup. The semantics of those bytes are agreed between the
+// client side of the protocol and the worker; this message itself is
+// otherwise opaque.
+message InitResponse {
+    // (Optional) Inline init result returned by the worker. Opaque
+    // to the protocol; the client side of the protocol and the
+    // worker agree on what (if anything) it carries.
+    optional bytes data = 1;
+}
+
+// Optional. Used to stream the single UDF payload when it does not
+// fit in a single gRPC message. The default is to send the payload
+// inline on [[UdfPayload.payload]]; chunking is only needed when a
+// payload exceeds the gRPC message size limit.
+//
+// When used, chunks are sent zero or more times after [[Init]] and
+// before the first [[DataRequest]]. The worker concatenates the
+// inline [[UdfPayload.payload]] (if any) followed by all chunks in
+// arrival order to form the final payload.
+//
+// Chunks are part of the Init handshake, not standalone control
+// messages: they extend [[Init.udf.payload]] and are not
+// individually acknowledged. The single [[InitResponse]] covers
+// Init plus all of its chunks together. [[PayloadChunk.last]] = true
+// is the canonical end-of-chunking signal; the worker MUST NOT send
+// [[InitResponse]] before receiving it.
+//
+// When [[UdfPayload.payload_size]] is set on [[Init.udf]], receivers
+// MAY validate that the total assembled payload (inline
+// [[UdfPayload.payload]] bytes plus all chunk bytes) matches it; a
+// mismatch is a protocol error.
+message PayloadChunk {
+    // (Required, non-empty.) Bytes appended to the [[Init.udf]]
+    // payload.
+    bytes data = 1;
+
+    // Marks the final chunk. When the engine uses [[PayloadChunk]]
+    // at all, it MUST set `last = true` on the last chunk. This is
+    // the canonical end-of-chunking signal: the worker MUST wait for
+    // it before emitting [[InitResponse]] and before treating any
+    // subsequent message as a [[DataRequest]]. Non-final chunks omit
+    // this field.
+    //
+    // Kept `optional` so future revisions can distinguish "engine did
+    // not set this field" from "engine set false" without renumbering.
+    optional bool last = 2;
+}
+
+// =====================================================================
+// Data phase
+//
+// `data` is intentionally a top-level `bytes` field on both request
+// and response messages -- not nested inside a wrapper -- so that
+// implementations can avoid an extra copy when reading or writing
+// the payload. The wire format (Arrow IPC etc.) is declared once per
+// session via [[Init.data_format]] and stays the same for the life
+// of the stream.
+//
+// Backpressure: this protocol currently relies on gRPC's transport-level
+// (HTTP/2) flow control for backpressure. Application-level backpressure
+// is not yet defined and may be introduced in a future revision.
+// =====================================================================
+
+// Engine -> Worker per-batch payload.
+message DataRequest {
+    // (Required) Encoded data bytes for one batch in the
+    // session-declared format. What "empty" means for a batch is
+    // defined by the session's [[Init.data_format]] -- for Arrow IPC
+    // even a zero-row batch carries a non-empty header, while future
+    // formats may permit truly zero-length payloads. Validation
+    // beyond "non-empty bytes" is delegated to the format decoder.
+    bytes data = 1;
+}
+
+// Worker -> Engine per-batch payload. The worker emits zero or more
+// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
+// [[CancelResponse]]. Sink-style UDFs (which consume input but
+// produce no output rows on the data plane) emit exactly zero.
+message DataResponse {
+    // (Required) Encoded data bytes for one batch in the
+    // session-declared format. See [[DataRequest.data]] for the
+    // meaning of "empty".
+    bytes data = 1;
+}
+
+// =====================================================================
+// Finish / Cancel phase
+// =====================================================================
+
+// Sent by the engine when all input data has been submitted and normal
+// completion is expected. The worker MUST drain any remaining output,
+// then emit [[FinishResponse]] and close the response stream.
+//
+// [[Cancel]] MAY follow [[Finish]] on the same stream if the engine
+// wants to abort processing of already-submitted data (e.g. a Spark
+// task is interrupted after all input batches were sent). [[Cancel]]
+// MUST NOT precede [[Finish]]; if the engine cancels before sending
+// all data it sends [[Cancel]] without [[Finish]].
+//
+// Worker behavior when [[Cancel]] follows [[Finish]]:
+//   - If [[FinishResponse]] has not yet been sent, the worker MUST
+//     abort output, run cleanup, and send [[CancelResponse]].
+//   - If [[FinishResponse]] has already been sent, [[Cancel]] arrives
+//     too late and is ignored; the engine receives [[FinishResponse]].
+// The engine MUST therefore be prepared to receive either
+// [[FinishResponse]] or [[CancelResponse]] when it sends both.
+message Finish {}
+
+// Worker -> Engine completion message. May carry summary metrics.
+message FinishResponse {
+    // Final metrics aggregated over the whole session (e.g. rows
+    // in/out, time per phase). Free-form; names are worker-defined.
+    map<string, string> metrics = 1;
+
+    // (Optional) Inline finish result returned by the worker.
+    // Mirrors [[InitResponse.data]] -- the finish phase allows the
+    // engine to interact with the UDF after data has stopped
+    // flowing, with the worker returning opaque bytes the engine (or
+    // higher-level code) may consume during teardown. The semantics
+    // of those bytes are agreed between the client side of the
+    // protocol and the worker.
+    optional bytes data = 2;
+}
+
+// Engine -> Worker explicit cancel. Distinct from a gRPC stream error
+// so the worker can run cleanup deterministically (release file
+// handles, drop temp state, etc.). After receiving [[Cancel]] the
+// worker MUST stop emitting [[DataResponse]] messages, run cleanup,
+// and emit [[CancelResponse]] before closing.
+//
+// [[Cancel]] is the cooperative cancellation path and may be sent
+// either instead of [[Finish]] (engine cancels before all data is
+// submitted) or after [[Finish]] (engine aborts processing of
+// already-submitted data -- see [[Finish]] for the full contract).
+// A broken gRPC connection is the involuntary fallback -- in that
+// case gRPC surfaces an error on the stream (see [[Execute]]).
+//
+// Worker behavior on involuntary stream error: when the worker observes
+// a gRPC error on the Execute stream (i.e. the engine-side connection
+// dropped), it MUST treat this as equivalent to [[Cancel]] for cleanup
+// purposes -- stop producing output and release resources. The worker
+// MUST NOT attempt to send [[CancelResponse]] or any other message,
+// since the stream is already dead.
+message Cancel {
+    // (Optional) Free-form reason for diagnostics.
+    optional string reason = 1;
+}
+
+// Worker -> Engine acknowledgment of [[Cancel]].
+message CancelResponse {}
+
+// Worker -> Engine. Signals a non-gRPC application-level error.
+// Distinct from a gRPC stream error: gRPC errors indicate transport or
+// protocol failures; [[ExecutionError]] carries errors that should be
+// raised as user-facing exceptions.
+//
+// Wire position: emitted at most once, after all [[DataResponse]]
+// messages. After sending [[ExecutionError]] the worker MUST stop
+// processing [[DataRequest]] messages and wait for the engine to send
+// [[Finish]] or [[Cancel]], then respond with [[FinishResponse]] or
+// [[CancelResponse]] accordingly.
+//
+// Engine behavior on receipt: the engine records the error and then:
+//   - If [[Finish]] has already been sent: waits for [[FinishResponse]]
+//     and raises the error afterwards.
+//   - If [[Finish]] has not yet been sent: sends [[Cancel]], waits for
+//     [[CancelResponse]], and raises the error afterwards.
+// If more than one [[ExecutionError]] arrives (protocol violation),
+// the engine SHOULD surface the first as the primary exception and
+// attach subsequent ones as suppressed exceptions.
+//
+// Cancel interaction: if [[Cancel]] arrives while the worker is
+// preparing to send [[ExecutionError]], the cancel-vs-finish race rules
+// in [[Finish]] apply. The worker emits exactly one terminator.
+message ExecutionError {
+    // Exactly one kind MUST be set.
+    oneof kind {
+        UserError     user     = 1;
+        WorkerError   worker   = 2;
+        ProtocolError protocol = 3;
+    }
+}
+
+// Error raised by the user's UDF code.
+message UserError {
+    // (Required) Human-readable error message from the UDF.
+    string message = 1;
+
+    // (Optional) Full stack trace or traceback in the worker's
+    // language-specific format. Forwarded to the user as-is.
+    optional string traceback = 2;
+
+    // (Optional) Language-specific error class name (e.g. "ValueError"
+    // in Python, "RuntimeException" in Java).
+    optional string error_class = 3;
+}
+
+// Error originating from the worker implementation itself, not user code.
+message WorkerError {
+    // (Required) Human-readable description of the worker error.
+    string message = 1;
+
+    // (Optional) Stack trace for diagnostics.
+    optional string traceback = 2;
+}
+
+// Protocol violation detected by the worker (e.g. messages received out
+// of order, unsupported [[Init.protocol_version]]). Sending this type
+// instead of closing with a gRPC error keeps the stream lifecycle intact:
+// [[FinishResponse]] or [[CancelResponse]] still follows.
+message ProtocolError {
+    // (Required) Description of the protocol violation.
+    string message = 1;
+}
+
+// The single UDF body delivered to the worker on [[Init]]. Opaque to
+// the engine: the engine forwards [[payload]] and [[format]]
+// unchanged, and the worker decodes them per the format the client
+// and worker have agreed on.
+message UdfPayload {
+    // (Required, may be empty when chunked.) Serialized UDF bundle,
+    // opaque to the engine. The encoding is declared in [[format]].
+    //
+    // The bundle is not necessarily just the serialized callable;
+    // it is up to the client side of the protocol and the worker to
+    // agree on what is packed inside it -- e.g. custom encoders for
+    // user-defined types, type hints, or any other metadata the
+    // worker needs to invoke the UDF.
+    //
+    // For payloads too large to fit on a single gRPC message, this
+    // field MAY be left empty (zero-length bytes) and the bytes
+    // delivered via the [[PayloadChunk]] mechanism instead. See
+    // [[PayloadChunk]] for chunking semantics.
+    bytes payload = 1;
+
+    // (Required, non-empty.) Format tag identifying the encoding of
+    // [[payload]]. The protocol does not enumerate the values: the
+    // client side of the protocol and the worker agree on the
+    // namespace, and each worker recognises the tags it knows how
+    // to decode. The engine forwards this string unchanged.
+    string format = 2;
+
+    // (Optional) Total payload size in bytes. Useful when chunked
+    // streaming is used so the worker can pre-allocate buffers.
+    optional int64 payload_size = 3;
+
+    // (Optional) Human-readable name for diagnostics and metrics.
+    optional string name = 4;
+
+    // (Optional) Worker / language-specific dispatch hint. A
+    // free-form string the worker uses to pick the code path that
+    // handles this payload. The protocol does not enumerate eval
+    // types because they are language-specific; the client side of
+    // the protocol and the worker agree on the namespace and the
+    // values.
+    //
+    // When the worker can derive the eval type from the payload
+    // itself (embedded metadata, format tag, etc.), this field is
+    // left unset. Otherwise the client side of the protocol sets it
+    // explicitly.
+    optional string eval_type = 5;
+}
+
+// =====================================================================
+// Manage RPC -- worker-scoped operations independent of Execute
+// =====================================================================
+
+// Engine -> Worker. Wraps the manage operations in a oneof so the RPC
+// is a single typed call, leaving room for future operations
+// (capability query, profiling, ...).
+message WorkerRequest {
+    // Exactly one branch MUST be set; receivers MUST reject messages
+    // with no branch set.
+    oneof manage {
+        Heartbeat       heartbeat = 1;
+        ShutdownRequest shutdown  = 2;
+    }
+}
+
+// Worker -> Engine.
+message WorkerResponse {
+    // Exactly one branch MUST be set, and it MUST be the branch
+    // matching the request's oneof (e.g. [[Heartbeat]] is answered
+    // with [[HeartbeatResponse]], not [[ShutdownResponse]]). A
+    // mismatched response is a protocol error.
+    oneof manage {
+        HeartbeatResponse heartbeat = 1;
+        ShutdownResponse  shutdown  = 2;
+    }
+}
+
+// Liveness probe. The engine may send this periodically to detect a
+// hung worker process. The worker SHOULD reply within a small bounded
+// time.
+//
+// This is an application-level liveness check distinct from gRPC's
+// transport-level keepalive: gRPC keepalive proves the TCP connection
+// is alive, whereas [[Heartbeat]] proves the worker's request-handling
+// thread is responsive. Deployments may use either or both; they do
+// not replace each other.
+//
+// What the engine does in response to a missed heartbeat (e.g.,
+// tearing down the worker) is outside the scope of this protocol and
+// depends on the worker management mode defined in the worker
+// specification.
+//
+// Note: [[Heartbeat]] can only detect a hung worker process (one
+// whose request-handling thread is unresponsive). It cannot detect
+// user code that is executing but taking unexpectedly long -- during
+// init, data processing, or finish -- because such code is
+// indistinguishable from legitimately slow UDF execution. Handling
+// hanging user code (e.g. via UDF-level timeouts) is the
+// responsibility of the UDF author or the worker implementation, and
+// is outside the scope of this protocol.
+message Heartbeat {
+    // Reserved for future additive fields (e.g. an engine-side
+    // sequence number or a request-id tag for correlating heartbeats
+    // when sent over a long-lived connection).
+    reserved 1;
+}
+
+// Acknowledgment for [[Heartbeat]].
+message HeartbeatResponse {

Review Comment:
   sure, i can add that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to