Repository: beam
Updated Branches:
  refs/heads/master 3bd68ecfd -> 80c86f81b


Add fn API progress reporting protos


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/53d8b27c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/53d8b27c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/53d8b27c

Branch: refs/heads/master
Commit: 53d8b27ca1bc186d01c36ae663c8e3ab82011d00
Parents: 3bd68ec
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Fri Sep 1 09:45:22 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Sep 7 13:48:07 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     | 98 ++++++++++++++++++--
 1 file changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/53d8b27c/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 53d67bc..9bf1b5f 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -97,7 +97,9 @@ service BeamFnControl {
   ) {}
 }
 
-// A request sent by a runner which it the SDK is asked to fulfill.
+// A request sent by a runner which the SDK is asked to fulfill.
+// For any unsupported request type, an error should be returned with a
+// matching instruction id.
 // Stable
 message InstructionRequest {
   // (Required) An unique identifier provided by the runner which represents
@@ -189,23 +191,101 @@ message ProcessBundleRequest {
 
 // Stable
 message ProcessBundleResponse {
+  // (Optional) If metrics reporting is supported by the SDK, this represents
+  // the final metrics to record for this bundle.
+  Metrics metrics = 1;
 }
 
+// A request to report progress information for a given bundle.
+// This is an optional request to be handled and is used to support advanced
+// SDK features such as SplittableDoFn, user level metrics etc.
 message ProcessBundleProgressRequest {
   // (Required) A reference to an active process bundle request with the given
   // instruction id.
   string instruction_reference = 1;
 }
 
+message Metrics {
+  // PTransform level metrics.
+  // These metrics are split into processed and active element groups for
+  // progress reporting purposes. This allows a Runner to see what is measured,
+  // what is estimated and what can be extrapolated to be able to accurately
+  // estimate the backlog of remaining work.
+  message PTransform {
+    // Metrics that are measured for processed and active element groups.
+    message Measured {
+      // (Required) Map from local input name to number of elements processed
+      // from this input.
+      map<string, int64> input_element_counts = 1;
+
+      // (Required) Map from local output name to number of elements produced
+      // for this output.
+      map<string, int64> output_element_counts = 2;
+
+      // (Optional) The total time spent so far in processing the elements in
+      // this group.
+      int64 total_time_spent = 3;
+
+      // TODO: Add other element group level metrics.
+    }
+
+    // Metrics for fully processed elements.
+    message ProcessedElements {
+      // (Required)
+      Measured measured = 1;
+    }
+
+    // Metrics for active elements.
+    // An element is considered active if the SDK has started but not finished
+    // processing it yet.
+    message ActiveElements {
+      // (Required)
+      Measured measured = 1;
+
+      // Estimated metrics.
+
+      // (Optional) Sum of estimated fraction of known work remaining for all
+      // active elements, as reported by this transform.
+      // If not reported, a Runner could extrapolate this from the processed
+      // elements.
+      // TODO: Handle the case when known work is infinite.
+      double fraction_remaining = 2;
+
+      // (Optional) Map from local output name to sum of estimated number
+      // of elements remaining for this output from all active elements,
+      // as reported by this transform.
+      // If not reported, a Runner could extrapolate this from the processed
+      // elements.
+      map<string, int64> output_elements_remaining = 3;
+    }
+
+    // (Required): Metrics for processed elements.
+    ProcessedElements processed_elements = 1;
+    // (Required): Metrics for active elements.
+    ActiveElements active_elements = 2;
+
+    // (Optional): Map from local output name to its watermark.
+    // The watermarks reported are tentative, to get a better sense of progress
+    // while processing a bundle but before it is committed. At bundle commit
+    // time, a Runner needs to also take into account the timers set to compute
+    // the actual watermarks.
+    map<string, int64> watermarks = 3;
+
+    // TODO: Define other transform level system metrics.
+  }
+
+  // User defined metrics
+  message User {
+    // TODO: Define it.
+  }
+
+  map<string, PTransform> ptransforms = 1;
+  map<string, User> user = 2;
+}
+
 message ProcessBundleProgressResponse {
-  // (Required) The finished amount of work. A monotonically increasing
-  // unitless measure of work finished.
-  double finished_work = 1;
-
-  // (Required) The known amount of backlog for the process bundle request.
-  // Computed as:
-  //   (estimated known work - finish work) / finished work
-  double backlog = 2;
+  // (Required)
+  Metrics metrics = 1;
 }
 
 message ProcessBundleSplitRequest {

Reply via email to