[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API
[ https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80004&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80004 ] ASF GitHub Bot logged work on BEAM-3741: Author: ASF GitHub Bot Created on: 13/Mar/18 18:32 Start Date: 13/Mar/18 18:32 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4743: [BEAM-3741] Proto changes for splitting over Fn API URL: https://github.com/apache/beam/pull/4743#discussion_r174240519 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -182,6 +182,48 @@ message ProcessBundleDescriptor { org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7; } +// Represents a partition of the bundle into two bundles: a "primary" and +// a "residual", with the following properties: +// - The work in primary and residual doesn't overlap, and combined, adds up +// to the work in the current bundle if the split hadn't happened. +// - The current bundle, if it keeps executing, will have done none of the +// work under residual roots. +// - The current bundle, if no further splits happen, will have done exactly +// the work under primary_roots. +// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion +message BundleSplit { + // One of the root applications specifying the scope of work for a bundle. + message Application { +// (Required) The primitive transform to which to pass the element +string ptransform_id = 1; + +// (Required) Name of the transform's input to which to pass the element. +string input_id = 2; + +// (Required) The encoded element to pass to the transform. +bytes element = 3; + +// Approximate lower bounds on timestamps of elements that this PTransform +// will produce into each of its output PCollections, when invoked on this +// element. Keyed by the transform's local output name. +map output_watermarks = 4; + +// Approximate fraction of all work of the current bundle (before split) +// represented by invoking this Application and its downstream applications. +// The sum of fraction_of_work between all primary_roots and residual_roots +// must add up to approximately 1.0. +DoubleValue fraction_of_work = 5; Review comment: That would mean that this value is `optional`. If it becomes a required field we should change it back to double. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80004) Time Spent: 4h 10m (was: 4h) > Proto changes for splitting over Fn API > --- > > Key: BEAM-3741 > URL: https://issues.apache.org/jira/browse/BEAM-3741 > Project: Beam > Issue Type: Sub-task > Components: beam-model >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API
[ https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80005 ] ASF GitHub Bot logged work on BEAM-3741: Author: ASF GitHub Bot Created on: 13/Mar/18 18:33 Start Date: 13/Mar/18 18:33 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #4743: [BEAM-3741] Proto changes for splitting over Fn API URL: https://github.com/apache/beam/pull/4743 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto index 28c75595024..89c73a7798f 100644 --- a/model/fn-execution/src/main/proto/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/beam_fn_api.proto @@ -41,6 +41,7 @@ option java_outer_classname = "BeamFnApi"; import "beam_runner_api.proto"; import "endpoints.proto"; import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; /* * Constructs that define the pipeline shape. @@ -182,6 +183,48 @@ message ProcessBundleDescriptor { org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7; } +// Represents a partition of the bundle into two bundles: a "primary" and +// a "residual", with the following properties: +// - The work in primary and residual doesn't overlap, and combined, adds up +// to the work in the current bundle if the split hadn't happened. +// - The current bundle, if it keeps executing, will have done none of the +// work under residual roots. +// - The current bundle, if no further splits happen, will have done exactly +// the work under primary_roots. +// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion +message BundleSplit { + // One of the root applications specifying the scope of work for a bundle. + message Application { +// (Required) The primitive transform to which to pass the element +string ptransform_id = 1; + +// (Required) Name of the transform's input to which to pass the element. +string input_id = 2; + +// (Required) The encoded element to pass to the transform. +bytes element = 3; + +// Approximate lower bounds on timestamps of elements that this PTransform +// will produce into each of its output PCollections, when invoked on this +// element. Keyed by the transform's local output name. +map output_watermarks = 4; + +// Approximate fraction of all work of the current bundle (before split) +// represented by invoking this Application and its downstream applications. +// The sum of fraction_of_work between all primary_roots and residual_roots +// must add up to approximately 1.0. +google.protobuf.DoubleValue fraction_of_work = 5; + } + + // Root applications that should replace the current bundle. + repeated Application primary_roots = 1; + + // Root applications that have been removed from the current bundle and + // have to be executed in a separate bundle (e.g. in parallel on a different + // worker, or after the current bundle completes, etc.) + repeated Application residual_roots = 2; +} + // A request to process a given bundle. // Stable message ProcessBundleRequest { @@ -199,6 +242,10 @@ message ProcessBundleResponse { // (Optional) If metrics reporting is supported by the SDK, this represents // the final metrics to record for this bundle. Metrics metrics = 1; + + // (Optional) Specifies that the bundle has been split since the last + // ProcessBundleProgressResponse was sent. + BundleSplit split = 2; } // A request to report progress information for a given bundle. @@ -330,6 +377,10 @@ message Metrics { message ProcessBundleProgressResponse { // (Required) Metrics metrics = 1; + + // (Optional) Specifies that the bundle has been split since the last + // ProcessBundleProgressResponse was sent. + BundleSplit split = 2; } message ProcessBundleSplitRequest { @@ -337,106 +388,22 @@ message ProcessBundleSplitRequest { // instruction id. string instruction_reference = 1; - // (Required) The fraction of work (when compared to the known amount of work) - // the process bundle request should try to split at. - double fraction = 2; -} - -// urn:org.apache.beam:restriction:element-count:1.0 -message ElementCountRestriction { - // A restriction representing the number of elements that should be processed. - // Effectively the range [0, count] - int64 count = 1; -} - -// urn:org.apache.beam:restriction:element-count-skip:1.0 -message ElementCountSkipRestriction { - // A restriction representing the number of elements that
[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API
[ https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80002 ] ASF GitHub Bot logged work on BEAM-3741: Author: ASF GitHub Bot Created on: 13/Mar/18 18:30 Start Date: 13/Mar/18 18:30 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4743: [BEAM-3741] Proto changes for splitting over Fn API URL: https://github.com/apache/beam/pull/4743#discussion_r174239903 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -182,6 +182,48 @@ message ProcessBundleDescriptor { org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7; } +// Represents a partition of the bundle into two bundles: a "primary" and +// a "residual", with the following properties: +// - The work in primary and residual doesn't overlap, and combined, adds up +// to the work in the current bundle if the split hadn't happened. +// - The current bundle, if it keeps executing, will have done none of the +// work under residual roots. +// - The current bundle, if no further splits happen, will have done exactly +// the work under primary_roots. +// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion +message BundleSplit { + // One of the root applications specifying the scope of work for a bundle. + message Application { +// (Required) The primitive transform to which to pass the element +string ptransform_id = 1; + +// (Required) Name of the transform's input to which to pass the element. +string input_id = 2; + +// (Required) The encoded element to pass to the transform. +bytes element = 3; + +// Approximate lower bounds on timestamps of elements that this PTransform +// will produce into each of its output PCollections, when invoked on this +// element. Keyed by the transform's local output name. +map output_watermarks = 4; Review comment: The issue I see is that if you perform a split, how do you hold the input watermark of these applications so that state/side inputs aren't garbage collected, timers don't fire too early (watermark based timers). It may be that we need both or we could say that the input watermark is always a lower bound for the output watermarks. The change looks fine for now but I think this is an important point to discuss. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80002) Time Spent: 4h (was: 3h 50m) > Proto changes for splitting over Fn API > --- > > Key: BEAM-3741 > URL: https://issues.apache.org/jira/browse/BEAM-3741 > Project: Beam > Issue Type: Sub-task > Components: beam-model >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API
[ https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=79323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79323 ] ASF GitHub Bot logged work on BEAM-3741: Author: ASF GitHub Bot Created on: 12/Mar/18 03:11 Start Date: 12/Mar/18 03:11 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #4743: [BEAM-3741] Proto changes for splitting over Fn API URL: https://github.com/apache/beam/pull/4743#discussion_r173686600 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -182,6 +182,48 @@ message ProcessBundleDescriptor { org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7; } +// Represents a partition of the bundle into two bundles: a "primary" and +// a "residual", with the following properties: +// - The work in primary and residual doesn't overlap, and combined, adds up +// to the work in the current bundle if the split hadn't happened. +// - The current bundle, if it keeps executing, will have done none of the +// work under residual roots. +// - The current bundle, if no further splits happen, will have done exactly +// the work under primary_roots. +// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion +message BundleSplit { + // One of the root applications specifying the scope of work for a bundle. + message Application { +// (Required) The primitive transform to which to pass the element +string ptransform_id = 1; + +// (Required) Name of the transform's input to which to pass the element. +string input_id = 2; + +// (Required) The encoded element to pass to the transform. +bytes element = 3; + +// Approximate lower bounds on timestamps of elements that this PTransform +// will produce into each of its output PCollections, when invoked on this +// element. Keyed by the transform's local output name. +map output_watermarks = 4; Review comment: No, this has to be output watermark: for an Application of the form `f(x)` , the input watermark would be "a promise about timestamps of future elements from the same PCollection that x came from", but the Application has no control over that - the Application can only make promises about what happens if you execute this Application. For example, if this is an Application of an SDF that reads data from Kafka, the "element" is (element:(topic, partition), restriction:offset), and there's only 1 output, and we're making a promise about what timestamps will be output if you read this partition from this offset - it's the watermark of this partition; the watermark of the topic/partition PCollection itself is irrelevant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79323) Time Spent: 3.5h (was: 3h 20m) > Proto changes for splitting over Fn API > --- > > Key: BEAM-3741 > URL: https://issues.apache.org/jira/browse/BEAM-3741 > Project: Beam > Issue Type: Sub-task > Components: beam-model >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API
[ https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=79324&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79324 ] ASF GitHub Bot logged work on BEAM-3741: Author: ASF GitHub Bot Created on: 12/Mar/18 03:11 Start Date: 12/Mar/18 03:11 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #4743: [BEAM-3741] Proto changes for splitting over Fn API URL: https://github.com/apache/beam/pull/4743#discussion_r173686781 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -182,6 +182,48 @@ message ProcessBundleDescriptor { org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7; } +// Represents a partition of the bundle into two bundles: a "primary" and +// a "residual", with the following properties: +// - The work in primary and residual doesn't overlap, and combined, adds up +// to the work in the current bundle if the split hadn't happened. +// - The current bundle, if it keeps executing, will have done none of the +// work under residual roots. +// - The current bundle, if no further splits happen, will have done exactly +// the work under primary_roots. +// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion +message BundleSplit { + // One of the root applications specifying the scope of work for a bundle. + message Application { +// (Required) The primitive transform to which to pass the element +string ptransform_id = 1; + +// (Required) Name of the transform's input to which to pass the element. +string input_id = 2; + +// (Required) The encoded element to pass to the transform. +bytes element = 3; + +// Approximate lower bounds on timestamps of elements that this PTransform +// will produce into each of its output PCollections, when invoked on this +// element. Keyed by the transform's local output name. +map output_watermarks = 4; + +// Approximate fraction of all work of the current bundle (before split) +// represented by invoking this Application and its downstream applications. +// The sum of fraction_of_work between all primary_roots and residual_roots +// must add up to approximately 1.0. +DoubleValue fraction_of_work = 5; Review comment: I decided to use DoubleValue because there is a difference between specifying 0 and specifying "I don't know" - if an SDK fails to specify this, I'd like to be able to tell whether there's a 0 because the SDK doesn't now, or whether it's because the SDK really thinks it's 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79324) Time Spent: 3h 40m (was: 3.5h) > Proto changes for splitting over Fn API > --- > > Key: BEAM-3741 > URL: https://issues.apache.org/jira/browse/BEAM-3741 > Project: Beam > Issue Type: Sub-task > Components: beam-model >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3741) Proto changes for splitting over Fn API
[ https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=79325&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79325 ] ASF GitHub Bot logged work on BEAM-3741: Author: ASF GitHub Bot Created on: 12/Mar/18 03:11 Start Date: 12/Mar/18 03:11 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #4743: [BEAM-3741] Proto changes for splitting over Fn API URL: https://github.com/apache/beam/pull/4743#discussion_r173687202 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -199,6 +232,10 @@ message ProcessBundleResponse { // (Optional) If metrics reporting is supported by the SDK, this represents // the final metrics to record for this bundle. Metrics metrics = 1; + + // (Optional) Specifies that the bundle has been split since the last + // ProcessBundleProgressResponse was sent. + BundleSplit split = 2; Review comment: I agree that https://issues.apache.org/jira/browse/BEAM-3787 is the right way to go, however I'd like to make progress on the current PR and followup work and have some protos upon which to build the SDK harness and runner harness changes - I think as BEAM-3787 is implemented, it will be fairly easy to port onto that new API; I'm willing to do it for splitting in particular when it's time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79325) Time Spent: 3h 50m (was: 3h 40m) > Proto changes for splitting over Fn API > --- > > Key: BEAM-3741 > URL: https://issues.apache.org/jira/browse/BEAM-3741 > Project: Beam > Issue Type: Sub-task > Components: beam-model >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)