[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80004&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80004
 ]

ASF GitHub Bot logged work on BEAM-3741:


Author: ASF GitHub Bot
Created on: 13/Mar/18 18:32
Start Date: 13/Mar/18 18:32
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4743: 
[BEAM-3741] Proto changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743#discussion_r174240519
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -182,6 +182,48 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
+// Represents a partition of the bundle into two bundles: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
+message BundleSplit {
+  // One of the root applications specifying the scope of work for a bundle.
+  message Application {
+// (Required) The primitive transform to which to pass the element
+string ptransform_id = 1;
+
+// (Required) Name of the transform's input to which to pass the element.
+string input_id = 2;
+
+// (Required) The encoded element to pass to the transform.
+bytes element = 3;
+
+// Approximate lower bounds on timestamps of elements that this PTransform
+// will produce into each of its output PCollections, when invoked on this
+// element. Keyed by the transform's local output name.
+map output_watermarks = 4;
+
+// Approximate fraction of all work of the current bundle (before split)
+// represented by invoking this Application and its downstream 
applications.
+// The sum of fraction_of_work between all primary_roots and residual_roots
+// must add up to approximately 1.0.
+DoubleValue fraction_of_work = 5;
 
 Review comment:
   That would mean that this value is `optional`. If it becomes a required 
field we should change it back to double.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80004)
Time Spent: 4h 10m  (was: 4h)

> Proto changes for splitting over Fn API
> ---
>
> Key: BEAM-3741
> URL: https://issues.apache.org/jira/browse/BEAM-3741
> Project: Beam
>  Issue Type: Sub-task
>  Components: beam-model
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80005
 ]

ASF GitHub Bot logged work on BEAM-3741:


Author: ASF GitHub Bot
Created on: 13/Mar/18 18:33
Start Date: 13/Mar/18 18:33
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #4743: [BEAM-3741] Proto 
changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 28c75595024..89c73a7798f 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -41,6 +41,7 @@ option java_outer_classname = "BeamFnApi";
 import "beam_runner_api.proto";
 import "endpoints.proto";
 import "google/protobuf/timestamp.proto";
+import "google/protobuf/wrappers.proto";
 
 /*
  * Constructs that define the pipeline shape.
@@ -182,6 +183,48 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
+// Represents a partition of the bundle into two bundles: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
+message BundleSplit {
+  // One of the root applications specifying the scope of work for a bundle.
+  message Application {
+// (Required) The primitive transform to which to pass the element
+string ptransform_id = 1;
+
+// (Required) Name of the transform's input to which to pass the element.
+string input_id = 2;
+
+// (Required) The encoded element to pass to the transform.
+bytes element = 3;
+
+// Approximate lower bounds on timestamps of elements that this PTransform
+// will produce into each of its output PCollections, when invoked on this
+// element. Keyed by the transform's local output name.
+map output_watermarks = 4;
+
+// Approximate fraction of all work of the current bundle (before split)
+// represented by invoking this Application and its downstream 
applications.
+// The sum of fraction_of_work between all primary_roots and residual_roots
+// must add up to approximately 1.0.
+google.protobuf.DoubleValue fraction_of_work = 5;
+  }
+
+  // Root applications that should replace the current bundle.
+  repeated Application primary_roots = 1;
+
+  // Root applications that have been removed from the current bundle and
+  // have to be executed in a separate bundle (e.g. in parallel on a different
+  // worker, or after the current bundle completes, etc.)
+  repeated Application residual_roots = 2;
+}
+
 // A request to process a given bundle.
 // Stable
 message ProcessBundleRequest {
@@ -199,6 +242,10 @@ message ProcessBundleResponse {
   // (Optional) If metrics reporting is supported by the SDK, this represents
   // the final metrics to record for this bundle.
   Metrics metrics = 1;
+
+  // (Optional) Specifies that the bundle has been split since the last
+  // ProcessBundleProgressResponse was sent.
+  BundleSplit split = 2;
 }
 
 // A request to report progress information for a given bundle.
@@ -330,6 +377,10 @@ message Metrics {
 message ProcessBundleProgressResponse {
   // (Required)
   Metrics metrics = 1;
+
+  // (Optional) Specifies that the bundle has been split since the last
+  // ProcessBundleProgressResponse was sent.
+  BundleSplit split = 2;
 }
 
 message ProcessBundleSplitRequest {
@@ -337,106 +388,22 @@ message ProcessBundleSplitRequest {
   // instruction id.
   string instruction_reference = 1;
 
-  // (Required) The fraction of work (when compared to the known amount of 
work)
-  // the process bundle request should try to split at.
-  double fraction = 2;
-}
-
-// urn:org.apache.beam:restriction:element-count:1.0
-message ElementCountRestriction {
-  // A restriction representing the number of elements that should be 
processed.
-  // Effectively the range [0, count]
-  int64 count = 1;
-}
-
-// urn:org.apache.beam:restriction:element-count-skip:1.0
-message ElementCountSkipRestriction {
-  // A restriction representing the number of elements that

[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80002
 ]

ASF GitHub Bot logged work on BEAM-3741:


Author: ASF GitHub Bot
Created on: 13/Mar/18 18:30
Start Date: 13/Mar/18 18:30
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4743: 
[BEAM-3741] Proto changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743#discussion_r174239903
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -182,6 +182,48 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
+// Represents a partition of the bundle into two bundles: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
+message BundleSplit {
+  // One of the root applications specifying the scope of work for a bundle.
+  message Application {
+// (Required) The primitive transform to which to pass the element
+string ptransform_id = 1;
+
+// (Required) Name of the transform's input to which to pass the element.
+string input_id = 2;
+
+// (Required) The encoded element to pass to the transform.
+bytes element = 3;
+
+// Approximate lower bounds on timestamps of elements that this PTransform
+// will produce into each of its output PCollections, when invoked on this
+// element. Keyed by the transform's local output name.
+map output_watermarks = 4;
 
 Review comment:
   The issue I see is that if you perform a split, how do you hold the input 
watermark of these applications so that state/side inputs aren't garbage 
collected, timers don't fire too early (watermark based timers).
   
   It may be that we need both or we could say that the input watermark is 
always a lower bound for the output watermarks.
   
   The change looks fine for now but I think this is an important point to 
discuss.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80002)
Time Spent: 4h  (was: 3h 50m)

> Proto changes for splitting over Fn API
> ---
>
> Key: BEAM-3741
> URL: https://issues.apache.org/jira/browse/BEAM-3741
> Project: Beam
>  Issue Type: Sub-task
>  Components: beam-model
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API

2018-03-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=79323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79323
 ]

ASF GitHub Bot logged work on BEAM-3741:


Author: ASF GitHub Bot
Created on: 12/Mar/18 03:11
Start Date: 12/Mar/18 03:11
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #4743: 
[BEAM-3741] Proto changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743#discussion_r173686600
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -182,6 +182,48 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
+// Represents a partition of the bundle into two bundles: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
+message BundleSplit {
+  // One of the root applications specifying the scope of work for a bundle.
+  message Application {
+// (Required) The primitive transform to which to pass the element
+string ptransform_id = 1;
+
+// (Required) Name of the transform's input to which to pass the element.
+string input_id = 2;
+
+// (Required) The encoded element to pass to the transform.
+bytes element = 3;
+
+// Approximate lower bounds on timestamps of elements that this PTransform
+// will produce into each of its output PCollections, when invoked on this
+// element. Keyed by the transform's local output name.
+map output_watermarks = 4;
 
 Review comment:
   No, this has to be output watermark: for an Application of the form `f(x)` , 
the input watermark would be "a promise about timestamps of future elements 
from the same PCollection that x came from", but the Application has no control 
over that - the Application can only make promises about what happens if you 
execute this Application.
   
   For example, if this is an Application of an SDF that reads data from Kafka, 
the "element" is (element:(topic, partition), restriction:offset), and there's 
only 1 output, and we're making a promise about what timestamps will be output 
if you read this partition from this offset - it's the watermark of this 
partition; the watermark of the topic/partition PCollection itself is 
irrelevant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79323)
Time Spent: 3.5h  (was: 3h 20m)

> Proto changes for splitting over Fn API
> ---
>
> Key: BEAM-3741
> URL: https://issues.apache.org/jira/browse/BEAM-3741
> Project: Beam
>  Issue Type: Sub-task
>  Components: beam-model
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API

2018-03-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=79324&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79324
 ]

ASF GitHub Bot logged work on BEAM-3741:


Author: ASF GitHub Bot
Created on: 12/Mar/18 03:11
Start Date: 12/Mar/18 03:11
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #4743: 
[BEAM-3741] Proto changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743#discussion_r173686781
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -182,6 +182,48 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
+// Represents a partition of the bundle into two bundles: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
+message BundleSplit {
+  // One of the root applications specifying the scope of work for a bundle.
+  message Application {
+// (Required) The primitive transform to which to pass the element
+string ptransform_id = 1;
+
+// (Required) Name of the transform's input to which to pass the element.
+string input_id = 2;
+
+// (Required) The encoded element to pass to the transform.
+bytes element = 3;
+
+// Approximate lower bounds on timestamps of elements that this PTransform
+// will produce into each of its output PCollections, when invoked on this
+// element. Keyed by the transform's local output name.
+map output_watermarks = 4;
+
+// Approximate fraction of all work of the current bundle (before split)
+// represented by invoking this Application and its downstream 
applications.
+// The sum of fraction_of_work between all primary_roots and residual_roots
+// must add up to approximately 1.0.
+DoubleValue fraction_of_work = 5;
 
 Review comment:
   I decided to use DoubleValue because there is a difference between 
specifying 0 and specifying "I don't know" - if an SDK fails to specify this, 
I'd like to be able to tell whether there's a 0 because the SDK doesn't now, or 
whether it's because the SDK really thinks it's 0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79324)
Time Spent: 3h 40m  (was: 3.5h)

> Proto changes for splitting over Fn API
> ---
>
> Key: BEAM-3741
> URL: https://issues.apache.org/jira/browse/BEAM-3741
> Project: Beam
>  Issue Type: Sub-task
>  Components: beam-model
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API

2018-03-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=79325&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79325
 ]

ASF GitHub Bot logged work on BEAM-3741:


Author: ASF GitHub Bot
Created on: 12/Mar/18 03:11
Start Date: 12/Mar/18 03:11
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #4743: 
[BEAM-3741] Proto changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743#discussion_r173687202
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -199,6 +232,10 @@ message ProcessBundleResponse {
   // (Optional) If metrics reporting is supported by the SDK, this represents
   // the final metrics to record for this bundle.
   Metrics metrics = 1;
+
+  // (Optional) Specifies that the bundle has been split since the last
+  // ProcessBundleProgressResponse was sent.
+  BundleSplit split = 2;
 
 Review comment:
   I agree that https://issues.apache.org/jira/browse/BEAM-3787 is the right 
way to go, however I'd like to make progress on the current PR and followup 
work and have some protos upon which to build the SDK harness and runner 
harness changes - I think as BEAM-3787 is implemented, it will be fairly easy 
to port onto that new API; I'm willing to do it for splitting in particular 
when it's time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79325)
Time Spent: 3h 50m  (was: 3h 40m)

> Proto changes for splitting over Fn API
> ---
>
> Key: BEAM-3741
> URL: https://issues.apache.org/jira/browse/BEAM-3741
> Project: Beam
>  Issue Type: Sub-task
>  Components: beam-model
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)