Luke Cwik created BEAM-3787:
-------------------------------

             Summary: Migrate Fn API to be bidirectional instruction/request 
stream
                 Key: BEAM-3787
                 URL: https://issues.apache.org/jira/browse/BEAM-3787
             Project: Beam
          Issue Type: Improvement
          Components: beam-model
            Reporter: Luke Cwik
            Assignee: Robert Bradshaw


Allow the SDK to request the Runner to do something on its behalf. This 
mechanism can be used for:
* Reporting final counters
* Work shedding (SDK can choose to reduce the amount of work it wants to do 
(checkpointing))
* Requesting process bundle descriptors (instead of requiring the Runner to 
send them and have the SDK cache them).
* Decoupling the message type in control allows for new types of messages to be 
added which are not one to one.

Example API change below (note that SdkMessage/RunnerMessage should use a 
different name):

// An API that describes control messages between the SDK and Runner to process
// bundles, split bundles, report progress, ...
service BeamFnControl {
  // 
  rpc Control(
    // A stream of SDK requests/responses.
    stream SdkMessage
  ) returns (
    // A stream of Runner requests/responses.
    stream RunnerMessage
  ) {}
}

// Messages a Runner can send over the control plane.
message RunnerMessage {
  // (Required) An unique identifier provided by the runner which represents
  // this requests execution. The RunnerInstructionResponse MUST have the 
matching id.
  string id = 1;

  oneof message {
    ErrorResponse error = 999;
    RegisterRequest register = 1000;
    ProcessBundleRequest process_bundle = 1001;
    ProcessBundleProgressRequest process_bundle_progress = 1002;
    ProcessBundleSplitRequest process_bundle_split = 1003;
    ShedBundleResponse shed_bundle = 1000;
  }
}

// Messages an SDK can send over the control plane.
message SdkMessage {
  oneof message {
    RunnerInstructionResponse runner_instruction_response = 1000;
    SdkInstructionRequest sdk_instruction_request = 1001;
  }
}

// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message RunnerInstructionRequest {
  // (Required) An unique identifier provided by the runner which represents
  // this requests execution. The RunnerInstructionResponse MUST have the 
matching id.
  string instruction_id = 1;

  // (Required) A request that the SDK Harness needs to interpret.
  oneof request {
    RegisterRequest register = 1000;
    ProcessBundleRequest process_bundle = 1001;
    ProcessBundleProgressRequest process_bundle_progress = 1002;
    ProcessBundleSplitRequest process_bundle_split = 1003;
  }
}

// The response for an associated request the SDK had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
  // (Required) A reference provided by the runner which represents a requests
  // execution. The RunnerInstructionResponse MUST have the matching id when
  // responding to the runner.
  string instruction_id = 1;

  // If this is specified, then this instruction has failed.
  // A human readable string representing the reason as to why processing has
  // failed.
  string error = 2;

  // If the instruction did not fail, it is required to return an equivalent
  // response type depending on the request this matches.
  oneof response {
    RegisterResponse register = 1000;
    ProcessBundleResponse process_bundle = 1001;
    ProcessBundleProgressResponse process_bundle_progress = 1002;
    ProcessBundleSplitResponse process_bundle_split = 1003;
  }
}

message SdkInstructionRequest {
  // (Required) An unique identifier provided by the SDK which represents
  // this requests execution. The SdkInstructionResponse MUST have the matching 
id.
  string instruction_id = 1;

  // (Required) A request that the Runner needs to interpret.
  oneof request {
    ShedBundleRequest shed_bundle = 1000;
  }
}

// The response for an associated request the Runner had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
  // (Required) A reference provided by the SDK which represents a requests
  // execution. The RunnerInstructionResponse MUST have the matching id when
  // responding to the SDK.
  string instruction_id = 1;

  // If this is specified, then this instruction has failed.
  // A human readable string representing the reason as to why processing has
  // failed.
  string error = 2;

  // If the instruction did not fail, it is required to return an equivalent
  // response type depending on the request this matches.
  oneof response {
    ShedBundleResponse shed_bundle = 1000;
  }
}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to