Luke Cwik created BEAM-3787: ------------------------------- Summary: Migrate Fn API to be bidirectional instruction/request stream Key: BEAM-3787 URL: https://issues.apache.org/jira/browse/BEAM-3787 Project: Beam Issue Type: Improvement Components: beam-model Reporter: Luke Cwik Assignee: Robert Bradshaw
Allow the SDK to request the Runner to do something on its behalf. This mechanism can be used for: * Reporting final counters * Work shedding (SDK can choose to reduce the amount of work it wants to do (checkpointing)) * Requesting process bundle descriptors (instead of requiring the Runner to send them and have the SDK cache them). * Decoupling the message type in control allows for new types of messages to be added which are not one to one. Example API change below (note that SdkMessage/RunnerMessage should use a different name): // An API that describes control messages between the SDK and Runner to process // bundles, split bundles, report progress, ... service BeamFnControl { // rpc Control( // A stream of SDK requests/responses. stream SdkMessage ) returns ( // A stream of Runner requests/responses. stream RunnerMessage ) {} } // Messages a Runner can send over the control plane. message RunnerMessage { // (Required) An unique identifier provided by the runner which represents // this requests execution. The RunnerInstructionResponse MUST have the matching id. string id = 1; oneof message { ErrorResponse error = 999; RegisterRequest register = 1000; ProcessBundleRequest process_bundle = 1001; ProcessBundleProgressRequest process_bundle_progress = 1002; ProcessBundleSplitRequest process_bundle_split = 1003; ShedBundleResponse shed_bundle = 1000; } } // Messages an SDK can send over the control plane. message SdkMessage { oneof message { RunnerInstructionResponse runner_instruction_response = 1000; SdkInstructionRequest sdk_instruction_request = 1001; } } // A request sent by a runner which the SDK is asked to fulfill. // For any unsupported request type, an error should be returned with a // matching instruction id. // Stable message RunnerInstructionRequest { // (Required) An unique identifier provided by the runner which represents // this requests execution. The RunnerInstructionResponse MUST have the matching id. string instruction_id = 1; // (Required) A request that the SDK Harness needs to interpret. oneof request { RegisterRequest register = 1000; ProcessBundleRequest process_bundle = 1001; ProcessBundleProgressRequest process_bundle_progress = 1002; ProcessBundleSplitRequest process_bundle_split = 1003; } } // The response for an associated request the SDK had been asked to fulfill. // Stable message RunnerInstructionResponse { // (Required) A reference provided by the runner which represents a requests // execution. The RunnerInstructionResponse MUST have the matching id when // responding to the runner. string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has // failed. string error = 2; // If the instruction did not fail, it is required to return an equivalent // response type depending on the request this matches. oneof response { RegisterResponse register = 1000; ProcessBundleResponse process_bundle = 1001; ProcessBundleProgressResponse process_bundle_progress = 1002; ProcessBundleSplitResponse process_bundle_split = 1003; } } message SdkInstructionRequest { // (Required) An unique identifier provided by the SDK which represents // this requests execution. The SdkInstructionResponse MUST have the matching id. string instruction_id = 1; // (Required) A request that the Runner needs to interpret. oneof request { ShedBundleRequest shed_bundle = 1000; } } // The response for an associated request the Runner had been asked to fulfill. // Stable message RunnerInstructionResponse { // (Required) A reference provided by the SDK which represents a requests // execution. The RunnerInstructionResponse MUST have the matching id when // responding to the SDK. string instruction_id = 1; // If this is specified, then this instruction has failed. // A human readable string representing the reason as to why processing has // failed. string error = 2; // If the instruction did not fail, it is required to return an equivalent // response type depending on the request this matches. oneof response { ShedBundleResponse shed_bundle = 1000; } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)