haiyangsun-db commented on code in PR #55657: URL: https://github.com/apache/spark/pull/55657#discussion_r3236781511
########## udf/worker/proto/src/main/protobuf/udf_protocol.proto: ########## @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +import "common.proto"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +// ===================================================================== +// Language-agnostic UDF execution protocol. +// +// The Spark engine acts as the gRPC client; a UDF worker (in any +// language) acts as the gRPC server. +// ===================================================================== + +// The default UDF gRPC service. A worker that exposes this service +// MUST do so over the default connection of the worker specification. +// +// In future, additional connections (e.g. a separate channel) may be +// reserved by the worker spec for other purposes. +service UdfWorker { + // Per-execution stream. See [[UdfControlRequest]] for the complete + // wire protocol and ordering invariants. + // + // Error contract: if the gRPC connection breaks at any point, gRPC + // surfaces an error on the stream. The engine therefore never needs + // to poll or time out waiting for a response -- the absence of a + // gRPC error guarantees that a proper protocol response will + // eventually arrive. This applies to every in-flight response, not + // only [[CancelResponse]]. + // + // Stream lifecycle: the engine MUST half-close the request stream + // (call onCompleted() on the gRPC stream) after the session + // terminates: on receiving [[FinishResponse]] or [[CancelResponse]] + // (clean termination) or on receiving a gRPC error (connection + // broke). Deferring the half-close until the outcome is known keeps + // the request stream open long enough for [[Cancel]] to follow + // [[Finish]] if needed. + // + // Response observer threading: gRPC does not permit concurrent calls + // to the response StreamObserver. Worker implementations that dispatch + // processing to a thread pool MUST serialize all writes to the response + // observer. + // + // For stateful execution, the state is maintained per bi-directional + // stream, mapping to a `WorkerSession` on the engine side. + rpc Execute(stream UdfRequest) returns (stream UdfResponse); + + // Worker-scoped management RPC, independent of any per-execution + // stream. Used for heartbeat, capability query, and graceful + // shutdown. Kept unary so it does not depend on the lifecycle of an + // active Execute stream. + rpc Manage(WorkerRequest) returns (WorkerResponse); +} + +// ===================================================================== +// Execute stream -- envelope +// ===================================================================== + +// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]] +// / [[Finish]] / [[Cancel]]) or a data message. +message UdfRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof request { + UdfControlRequest control = 1; + DataRequest data = 2; + } +} + +// Worker -> Engine. Either a control response ([[InitResponse]] / +// [[FinishResponse]] / [[CancelResponse]] / [[ExecutionError]]) or a +// data response message. +message UdfResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof response { + UdfControlResponse control = 1; + DataResponse data = 2; + } +} + +// Engine -> Worker control messages. +// +// Wire protocol for one Execute stream (both directions): +// +// Engine -> Worker: Init -> PayloadChunk* -> (DataRequest)* -> Finish (Cancel)? +// | Cancel +// Worker -> Engine: InitResponse -> (DataResponse)* -> (ExecutionError)? -> (FinishResponse | CancelResponse) +// +// DataRequest and DataResponse are independent streams: the worker +// may emit DataResponse messages at any point after InitResponse, +// including before the first DataRequest arrives. For generator-style +// UDFs that produce output without consuming input, there may be zero +// DataRequest messages -- the engine sends Finish directly after Init. +// The arrows above denote ordering constraints within each direction, +// not a request/response pairing. +// +// The engine MAY send DataRequests before receiving InitResponse (pipeline +// mode). The worker MUST buffer such DataRequests and process them in +// arrival order once init succeeds. They MAY be discarded only if init +// fails (i.e. the worker sends ExecutionError before InitResponse). +// +// Ordering invariants: +// - PayloadChunk* only after Init and before the first DataRequest. +// [[Init.is_chunking_payload]] = true signals that chunks will follow; +// [[PayloadChunk.last]] = true is the canonical end-of-chunking signal. +// When [[Init.is_chunking_payload]] is false or absent, [[InitResponse]] +// MAY be sent immediately after [[Init]] without waiting for chunks. +// - InitResponse MUST be emitted before any DataResponse. +// - ExecutionError (if any) MUST be emitted after all DataResponse +// messages. After sending it the worker MUST stop processing DataRequests +// and wait for the engine to send Finish or Cancel, then respond with +// FinishResponse or CancelResponse accordingly. At most one +// ExecutionError is sent per stream; the worker aggregates multiple +// errors internally. +// - The engine terminates with one of: +// (a) Finish alone -> worker sends FinishResponse. +// (b) Cancel alone -> worker sends CancelResponse. +// (c) Finish then Cancel -> worker sends CancelResponse if it has not +// yet sent FinishResponse, otherwise FinishResponse (see [[Finish]]). +// Cancel MUST NOT precede Finish on the same stream. +// +// A worker that receives messages out of order (e.g. a second Init, +// a PayloadChunk after the first DataRequest, a DataRequest before Init, +// or a Cancel before Init) MUST send [[ExecutionError]] with a +// [[ProtocolError]] kind, followed by [[FinishResponse]] or +// [[CancelResponse]] to close the stream cleanly. +message UdfControlRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + Init init = 1; + PayloadChunk payload = 2; + Finish finish = 3; + Cancel cancel = 4; + } +} + +// Worker -> Engine control messages. +message UdfControlResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + InitResponse init = 1; + FinishResponse finish = 2; + CancelResponse cancel = 3; + ExecutionError error = 4; + } +} + +// ===================================================================== +// Init phase +// ===================================================================== + +// Sent once, as the first message on an Execute stream. Describes +// the UDF body to run plus the minimum metadata the worker needs to +// start processing it. +// +// Today the protocol mandates exactly one Init per UDF execution +// (one Init -> data -> Finish). This is the simplest contract and +// covers all currently supported UDF kinds. In the future we may +// evolve to support multiple init phases on the same stream -- e.g. +// when worker setup requires an interactive handshake (negotiate a +// schema, exchange capabilities, fetch driver-side metadata, ...) +// before the data plane opens. Such an extension would be additive +// and would not change the single-Init semantics already in use. +// +// Engine vs. client split: +// * Most fields on Init are engine-side. They describe what +// flows on the wire for this session ([[data_format]] / +// [[input_schema]] / [[output_schema]] -- matching the worker +// spec, not the function's view) and what per-session +// context the worker needs ([[timezone]], [[session_conf]], +// [[task_context]], [[parameters]]). +// * [[UdfPayload]] carries everything the client side of Spark +// (where the UDF is defined and serialized) packs -- the +// serialized callable, an opaque format tag, and any encoder +// metadata bundled with the callable. The wire protocol does +// not enumerate encoder shapes; that is left to the client and +// worker to agree on per UDF type. +message Init { + // (Optional) Protocol version declared by the engine for this stream. + // Allows the worker to detect version mismatches early and reject + // streams using a protocol revision it does not support. When not set, + // the worker SHOULD assume the initial protocol version. + optional uint32 protocol_version = 1; + + // (Required) Wire format used for [[DataRequest.data]] and + // [[DataResponse.data]] for the life of this session. Must be + // one of the formats the worker declared in + // [[WorkerCapabilities.supported_data_formats]]; the client side + // of the protocol picks one at planning time and sticks with it. + // + // Workers MUST reject an [[Init]] whose [[data_format]] is + // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`, or whose value is not + // present in their declared + // [[WorkerCapabilities.supported_data_formats]]. The latter covers + // unknown enum values that proto3 passes through as numeric + // constants -- e.g. a newer engine selecting a format the worker + // does not implement. + UDFWorkerDataFormat data_format = 2; + + // (Required) The UDF body to execute on the worker for this + // session. Exactly one payload per Execute stream. + UdfPayload udf = 3; + + // (Optional) Schema of the input data plane in the wire format + // declared by [[data_format]] -- e.g. an Arrow IPC schema when + // data_format = ARROW. This is an engine-side requirement: it + // describes the bytes the engine will actually put on + // [[DataRequest.data]] for this session, matching what the + // worker advertised in its spec. It is NOT necessarily the + // schema the function definer expressed; the UDF's own type + // information lives inside [[UdfPayload]], typically embedded + // alongside the callable in [[UdfPayload.payload]] (e.g. as + // input/output encoders chosen per UDF type). + // + // Left unset when the worker can derive the schema from the + // payload alone. + optional bytes input_schema = 4; + + // (Optional) Schema of the output data plane in the wire format + // declared by [[data_format]]. Same semantics as + // [[input_schema]] -- engine-side requirement describing the + // bytes the engine expects on [[DataResponse.data]]. + optional bytes output_schema = 5; + + // (Optional; defaults to an empty map.) Per-task context + // provided by the engine. Common keys identify the task instance + // for diagnostics, logging, and stateful workers -- e.g. + // partition id, task attempt id, stage id, micro-batch id. + // Engine and worker agree on the keys they share; the protocol + // does not enumerate them. + map<string, string> task_context = 6; + + // (Optional; defaults to an empty map.) Worker-private knobs not + // already captured by typed fields above. Free-form; both sides + // agree on the keys they need. + // + // Any key that two languages converge on is a candidate for + // promotion to a structured proto field -- once promoted, it gets + // a typed field number from the reserved range right after this + // block and is removed from [[session_conf]]. [[timezone]] below + // is an example of a key that has already been promoted. + map<string, string> session_conf = 7; + + // (Optional) Session timezone, promoted out of [[session_conf]] + // because every eval needs it for timestamp encoding/decoding. + // + // Format follows Spark's `spark.sql.session.timeZone` config -- + // typically an IANA TZ id (e.g. "America/Los_Angeles") or a + // fixed offset (e.g. "+08:00"). The engine MUST pass the value + // it would resolve from the session conf without further + // transformation, so the worker can interpret it the same way + // Spark does. + optional string timezone = 8; + + // (Optional) When true, the UDF payload will be delivered via + // [[PayloadChunk]] messages rather than inline in [[UdfPayload.payload]]. + // The worker MUST wait for [[PayloadChunk.last]] = true before sending + // [[InitResponse]]. When false or absent, the payload is fully contained + // in [[UdfPayload.payload]] and the worker MAY send [[InitResponse]] + // immediately after [[Init]] without waiting for any [[PayloadChunk]]. + optional bool is_chunking_payload = 9; + + // Reserved for future typed Init fields, in particular keys + // graduated from [[session_conf]] (see the [[timezone]] precedent + // above). Numbers >= 100 are intentionally NOT reserved here; if + // a future revision needs an opaque escape-hatch field, give it a + // number >= 100 alongside [[parameters]] and add a field-level + // comment so the convention stays visible. + reserved 10 to 99; + + // (Optional) Engine-packed opaque parameters specific to a + // particular kind of UDF execution. The escape hatch for + // anything the engine needs the worker to see at init time + // that is not already captured by the typed fields above and + // does not fit naturally into [[task_context]]. The encoding + // is agreed between the engine and the worker; the protocol + // does not interpret it. The matching response, also opaque + // bytes, is returned via [[InitResponse.data]]. + // + // Numbers >= 100 are reserved by convention for opaque + // escape-hatch fields like this one; new typed fields use the + // reserved 10..99 range. + // + // Client-side init data (anything packed by the layer that + // defines and serializes the UDF) does NOT belong here -- it + // travels inside [[UdfPayload.payload]] instead. + optional bytes parameters = 100; +} + +// Acknowledgment for [[Init]]. The worker MUST send exactly one +// [[InitResponse]] before any [[DataResponse]]. When [[PayloadChunk]] +// is used to deliver the UDF payload, the worker MUST also wait until +// end-of-chunking to emit it (see [[PayloadChunk]]). +// +// The init phase allows the engine to interact with the UDF before +// data starts flowing -- the worker can return inline bytes here for +// the engine (or higher-level code on the engine side) to consume +// during setup. The semantics of those bytes are agreed between the +// client side of the protocol and the worker; this message itself is +// otherwise opaque. +message InitResponse { + // (Optional) Inline init result returned by the worker. Opaque + // to the protocol; the client side of the protocol and the + // worker agree on what (if anything) it carries. + optional bytes data = 1; +} + +// Optional. Used to stream the single UDF payload when it does not +// fit in a single gRPC message. The default is to send the payload +// inline on [[UdfPayload.payload]]; chunking is only needed when a +// payload exceeds the gRPC message size limit. +// +// When used, chunks are sent zero or more times after [[Init]] and +// before the first [[DataRequest]]. The worker concatenates the +// inline [[UdfPayload.payload]] (if any) followed by all chunks in +// arrival order to form the final payload. +// +// Chunks are part of the Init handshake, not standalone control +// messages: they extend [[Init.udf.payload]] and are not +// individually acknowledged. The single [[InitResponse]] covers +// Init plus all of its chunks together. [[PayloadChunk.last]] = true +// is the canonical end-of-chunking signal; the worker MUST NOT send +// [[InitResponse]] before receiving it. +// +// When [[UdfPayload.payload_size]] is set on [[Init.udf]], receivers Review Comment: For security or for integrity? How could this break? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
