Repository: beam Updated Branches: refs/heads/master 3bd68ecfd -> 80c86f81b
Add fn API progress reporting protos Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/53d8b27c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/53d8b27c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/53d8b27c Branch: refs/heads/master Commit: 53d8b27ca1bc186d01c36ae663c8e3ab82011d00 Parents: 3bd68ec Author: Vikas Kedigehalli <vika...@google.com> Authored: Fri Sep 1 09:45:22 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Sep 7 13:48:07 2017 -0700 ---------------------------------------------------------------------- .../fn-api/src/main/proto/beam_fn_api.proto | 98 ++++++++++++++++++-- 1 file changed, 89 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/53d8b27c/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index 53d67bc..9bf1b5f 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -97,7 +97,9 @@ service BeamFnControl { ) {} } -// A request sent by a runner which it the SDK is asked to fulfill. +// A request sent by a runner which the SDK is asked to fulfill. +// For any unsupported request type, an error should be returned with a +// matching instruction id. // Stable message InstructionRequest { // (Required) An unique identifier provided by the runner which represents @@ -189,23 +191,101 @@ message ProcessBundleRequest { // Stable message ProcessBundleResponse { + // (Optional) If metrics reporting is supported by the SDK, this represents + // the final metrics to record for this bundle. + Metrics metrics = 1; } +// A request to report progress information for a given bundle. +// This is an optional request to be handled and is used to support advanced +// SDK features such as SplittableDoFn, user level metrics etc. message ProcessBundleProgressRequest { // (Required) A reference to an active process bundle request with the given // instruction id. string instruction_reference = 1; } +message Metrics { + // PTransform level metrics. + // These metrics are split into processed and active element groups for + // progress reporting purposes. This allows a Runner to see what is measured, + // what is estimated and what can be extrapolated to be able to accurately + // estimate the backlog of remaining work. + message PTransform { + // Metrics that are measured for processed and active element groups. + message Measured { + // (Required) Map from local input name to number of elements processed + // from this input. + map<string, int64> input_element_counts = 1; + + // (Required) Map from local output name to number of elements produced + // for this output. + map<string, int64> output_element_counts = 2; + + // (Optional) The total time spent so far in processing the elements in + // this group. + int64 total_time_spent = 3; + + // TODO: Add other element group level metrics. + } + + // Metrics for fully processed elements. + message ProcessedElements { + // (Required) + Measured measured = 1; + } + + // Metrics for active elements. + // An element is considered active if the SDK has started but not finished + // processing it yet. + message ActiveElements { + // (Required) + Measured measured = 1; + + // Estimated metrics. + + // (Optional) Sum of estimated fraction of known work remaining for all + // active elements, as reported by this transform. + // If not reported, a Runner could extrapolate this from the processed + // elements. + // TODO: Handle the case when known work is infinite. + double fraction_remaining = 2; + + // (Optional) Map from local output name to sum of estimated number + // of elements remaining for this output from all active elements, + // as reported by this transform. + // If not reported, a Runner could extrapolate this from the processed + // elements. + map<string, int64> output_elements_remaining = 3; + } + + // (Required): Metrics for processed elements. + ProcessedElements processed_elements = 1; + // (Required): Metrics for active elements. + ActiveElements active_elements = 2; + + // (Optional): Map from local output name to its watermark. + // The watermarks reported are tentative, to get a better sense of progress + // while processing a bundle but before it is committed. At bundle commit + // time, a Runner needs to also take into account the timers set to compute + // the actual watermarks. + map<string, int64> watermarks = 3; + + // TODO: Define other transform level system metrics. + } + + // User defined metrics + message User { + // TODO: Define it. + } + + map<string, PTransform> ptransforms = 1; + map<string, User> user = 2; +} + message ProcessBundleProgressResponse { - // (Required) The finished amount of work. A monotonically increasing - // unitless measure of work finished. - double finished_work = 1; - - // (Required) The known amount of backlog for the process bundle request. - // Computed as: - // (estimated known work - finish work) / finished work - double backlog = 2; + // (Required) + Metrics metrics = 1; } message ProcessBundleSplitRequest {