xianzhe-databricks commented on code in PR #55657: URL: https://github.com/apache/spark/pull/55657#discussion_r3181070526
########## udf/worker/proto/src/main/protobuf/udf_protocol.proto: ########## @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +import "common.proto"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +// ===================================================================== +// Language-agnostic UDF execution protocol. +// +// The Spark engine acts as the gRPC client; a UDF worker (in any +// language) acts as the gRPC server. +// ===================================================================== + +// The default UDF gRPC service. A worker that exposes this service +// MUST do so over the default connection of the worker specification. +// +// In future, additional connections (e.g. a separate channel) may be +// reserved by the worker spec for other purposes. +service Worker { + // Per-execution stream. Exactly one [[Init]] is sent first, followed + // by 0..N data batches in either direction, terminated by exactly + // one [[Finish]] or [[Cancel]] from the engine. The worker MUST + // respond with the matching Init / Finish / Cancel responses on the + // response stream. + // + // For stateful execution, the state is maintained per bi-directional + // stream, mapping to a `WorkerSession` on the engine side + // (`org.apache.spark.udf.worker.core.WorkerSession`). + rpc Execute(stream UdfRequest) returns (stream UdfResponse); + + // Worker-scoped management RPC, independent of any per-execution + // stream. Used for heartbeat, capability query, and graceful + // shutdown. Kept unary so it does not depend on the lifecycle of an + // active Execute stream. + rpc Manage(WorkerRequest) returns (WorkerResponse); +} + +// ===================================================================== +// Execute stream -- envelope +// ===================================================================== + +// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]] +// / [[Finish]] / [[Cancel]]) or a data message. +message UdfRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof request { + UdfControlRequest control = 1; + DataRequest data = 2; + } +} + +// Worker -> Engine. Either a control response ([[InitResponse]] / +// [[FinishResponse]] / [[CancelResponse]]) or a data response message. +message UdfResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof response { + UdfControlResponse control = 1; + DataResponse data = 2; + } +} + +// Engine -> Worker control messages. +// +// Wire order on an Execute stream is exactly: +// Init { ... } +// PayloadChunk { ... }* // optional; 0..N chunks, only used when +// // the single UDF payload on Init is too +// // large to fit inline. +// ( DataRequest | <worker DataResponse> )* +// Finish { ... } OR Cancel { ... } // exactly one terminator +// +// The worker MUST emit [[InitResponse]] before sending any +// [[DataResponse]], and MUST emit exactly one [[FinishResponse]] or +// [[CancelResponse]] before closing the response stream. +// +// A worker that receives messages out of this order (e.g. a second Init, +// a PayloadChunk after the first DataRequest, a DataRequest before Init, +// or a Cancel before Init) MUST close the stream with an error. +message UdfControlRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + Init init = 1; + PayloadChunk payload = 2; + Finish finish = 3; + Cancel cancel = 4; + } +} + +// Worker -> Engine control messages. +message UdfControlResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + InitResponse init = 1; + FinishResponse finish = 2; + CancelResponse cancel = 3; + } +} + +// ===================================================================== +// Init phase +// ===================================================================== + +// Sent once, as the first message on an Execute stream. Describes +// the UDF body to run plus the minimum metadata the worker needs to +// start processing it. +// +// Today the protocol mandates exactly one Init per UDF execution +// (one Init -> data -> Finish). This is the simplest contract and +// covers all currently supported UDF kinds. In the future we may +// evolve to support multiple init phases on the same stream -- e.g. +// when worker setup requires an interactive handshake (negotiate a +// schema, exchange capabilities, fetch driver-side metadata, ...) +// before the data plane opens. Such an extension would be additive +// and would not change the single-Init semantics already in use. +// +// Engine vs. client split: +// * Most fields on Init are engine-side. They describe what +// flows on the wire for this session ([[data_format]] / +// [[input_schema]] / [[output_schema]] -- matching the worker +// spec, not the function's view) and what per-session +// context the worker needs ([[timezone]], [[session_conf]], +// [[task_context]], [[parameters]]). +// * [[UdfPayload]] carries everything the client side of Spark +// (where the UDF is defined and serialized) packs -- the +// callable bytes themselves, plus optional custom encoders +// that override the worker's built-in decoders only when the +// UDF deals in types the worker doesn't already know how to +// convert (e.g. recovering Arrow batches into client-provided +// Scala case classes or other user-defined types). +message Init { + // (Required) Wire format used for [[DataRequest.data]] and + // [[DataResponse.data]] for the life of this session. Must be + // one of the formats the worker declared in + // [[WorkerCapabilities.supported_data_formats]]; the client side + // of the protocol picks one at planning time and sticks with it. + // Receivers MUST reject an Init whose [[data_format]] is + // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`. + UDFWorkerDataFormat data_format = 1; + + // (Required) The UDF body to execute on the worker for this + // session. Exactly one payload per Execute stream. + UdfPayload udf = 2; + + // (Optional) Schema of the input data plane in the wire format + // declared by [[data_format]] -- e.g. an Arrow IPC schema when + // data_format = ARROW. This is an engine-side requirement: it + // describes the bytes the engine will actually put on + // [[DataRequest.data]] for this session, matching what the + // worker advertised in its spec. It is NOT necessarily the + // schema the function definer expressed; the UDF's own type + // information lives inside [[UdfPayload]] (embedded in the + // payload, or as a structured [[UdfPayload.input_encoder]] + // that converts wire bytes into language-native values). + // + // Left unset when the worker can derive the schema from the + // payload alone. + optional bytes input_schema = 3; + + // (Optional) Schema of the output data plane in the wire format + // declared by [[data_format]]. Same semantics as + // [[input_schema]] -- engine-side requirement describing the + // bytes the engine expects on [[DataResponse.data]]. + optional bytes output_schema = 4; + + // (Optional; defaults to an empty map.) Per-task context + // provided by the engine. Common keys identify the task instance + // for diagnostics, logging, and stateful workers -- e.g. + // partition id, task attempt id, stage id, micro-batch id. + // Engine and worker agree on the keys they share; the protocol + // does not enumerate them. + map<string, string> task_context = 5; + + // (Optional; defaults to an empty map.) Worker-private knobs not + // already captured by typed fields above. Free-form; both sides + // agree on the keys they need. + // + // Any key that two languages converge on is a candidate for + // promotion to a structured proto field -- once promoted, it gets + // a typed field number from the reserved range right after this + // block and is removed from [[session_conf]]. [[timezone]] below + // is an example of a key that has already been promoted. + map<string, string> session_conf = 6; + + // (Optional) Session timezone, promoted out of [[session_conf]] + // because every eval needs it for timestamp encoding/decoding. + optional string timezone = 7; + + // Reserved for future typed Init fields, in particular keys + // graduated from [[session_conf]] (see the [[timezone]] precedent + // above). Numbers >= 100 are intentionally NOT reserved here; if + // a future revision needs an opaque escape-hatch field, give it a + // number >= 100 alongside [[parameters]] and add a field-level + // comment so the convention stays visible. + reserved 8 to 99; + + // (Optional) Engine-packed opaque parameters specific to a + // particular kind of UDF execution. The escape hatch for + // anything the engine needs the worker to see at init time + // that is not already captured by the typed fields above and + // does not fit naturally into [[task_context]]. The encoding + // is agreed between the engine and the worker; the protocol + // does not interpret it. The matching response, also opaque + // bytes, is returned via [[InitResponse.data]]. + // + // Numbers >= 100 are reserved by convention for opaque + // escape-hatch fields like this one; new typed fields use the + // reserved 8..99 range. + // + // Client-side init data (anything packed by the layer that + // defines and serializes the UDF) does NOT belong here -- it + // travels inside [[UdfPayload.payload]] instead. + optional bytes parameters = 100; +} + +// Acknowledgment for [[Init]]. The worker MUST send exactly one +// [[InitResponse]] before any [[DataResponse]]. +// +// The init phase allows the engine to interact with the UDF before +// data starts flowing -- the worker can return inline bytes here for +// the engine (or higher-level code on the engine side) to consume +// during setup. The semantics of those bytes are agreed between the +// client side of the protocol and the worker; this message itself is +// otherwise opaque. +message InitResponse { + // (Optional) Inline init result returned by the worker. Opaque + // to the protocol; the client side of the protocol and the + // worker agree on what (if anything) it carries. + optional bytes data = 1; +} + +// Optional. Used to stream the single UDF payload when it does not +// fit in a single gRPC message. The default is to send the payload +// inline on [[UdfPayload.payload]]; chunking is only needed when a +// payload exceeds the gRPC message size limit. +// +// When used, chunks are sent zero or more times after [[Init]] and +// before the first [[DataRequest]]. The worker concatenates the +// inline [[UdfPayload.payload]] (if any) followed by all chunks in +// arrival order to form the final payload. +message PayloadChunk { + // (Required, non-empty.) Bytes appended to the [[Init.udf]] + // payload. + bytes data = 1; + + // (Optional) Set to true on the final chunk. Receivers MAY use + // this as an early signal that the payload is complete and + // decoding can begin; receivers that prefer to wait for the + // first [[DataRequest]] (which marks the end of the chunking + // phase) MAY ignore this. When unset, the receiver determines + // completeness by the arrival of the first [[DataRequest]]. + optional bool last = 2; +} + +// ===================================================================== +// Data phase +// +// `data` is intentionally a top-level `bytes` field on both request +// and response messages -- not nested inside a wrapper -- so that +// implementations can avoid an extra copy when reading or writing +// the payload. The wire format (Arrow IPC etc.) is declared once per +// session via [[Init.data_format]] and stays the same for the life +// of the stream. +// ===================================================================== + +// Engine -> Worker per-batch payload. +message DataRequest { + // (Required, non-empty.) Encoded data bytes for one batch in the + // session-declared format. + bytes data = 1; +} + +// Worker -> Engine per-batch payload. The worker emits zero or more +// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] / +// [[CancelResponse]]. Sink-style UDFs (which consume input but +// produce no output rows on the data plane) emit exactly zero. +message DataResponse { + // (Required, non-empty.) Encoded data bytes for one batch in the + // session-declared format. + bytes data = 1; +} + +// ===================================================================== +// Finish / Cancel phase +// ===================================================================== + +// Sent by the engine when no more input batches will arrive. The +// worker MUST drain any remaining output, then emit +// [[FinishResponse]] and close the response stream. +// +// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream; +// they are mutually exclusive. If the engine has already sent +// [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa). +message Finish {} + +// Worker -> Engine completion message. May carry summary metrics. +message FinishResponse { + // Final metrics aggregated over the whole session (e.g. rows + // in/out, time per phase). Free-form; names are worker-defined. + map<string, string> metrics = 1; + + // (Optional) Inline finish result returned by the worker. + // Mirrors [[InitResponse.data]] -- the finish phase allows the + // engine to interact with the UDF after data has stopped + // flowing, with the worker returning opaque bytes the engine (or + // higher-level code) may consume during teardown. The semantics + // of those bytes are agreed between the client side of the + // protocol and the worker. + optional bytes data = 2; +} + +// Engine -> Worker explicit cancel. Distinct from a gRPC stream error +// so the worker can run cleanup deterministically (release file +// handles, drop temp state, etc.). After receiving [[Cancel]] the +// worker MUST stop emitting [[DataResponse]] messages, run cleanup, +// and emit [[CancelResponse]] before closing. +// +// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream; +// see [[Finish]]. [[Cancel]] is the cooperative cancellation path; +// gRPC-level stream errors are the involuntary fallback. If the +// stream breaks before [[CancelResponse]] arrives, the engine +// considers the worker uncancellable for this session and relies on +// process-level cleanup. +message Cancel { + // (Optional) Free-form reason for diagnostics. + optional string reason = 1; +} + +// Worker -> Engine acknowledgment of [[Cancel]]. +message CancelResponse {} + +// The single UDF body delivered to the worker on [[Init]]. Opaque to +// the engine: the engine forwards [[payload]] and [[format]] +// unchanged, and the worker decodes them per the format the client +// and worker have agreed on. +message UdfPayload { Review Comment: how about payload language? I assume because the worker is already tied to a specific language, so it does not need to know what language this UDF payload is in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
