[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131447 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 06/Aug/18 15:55 Start Date: 06/Aug/18 15:55 Worklog Time Spent: 10m Work Description: herohde closed pull request #6143: [BEAM-4276] Combiner lifting for Dataflow URL: https://github.com/apache/beam/pull/6143 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go index 56625242c4f..062b12ae0e7 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go @@ -17,6 +17,7 @@ package dataflowlib import ( "bytes" + "encoding/json" "fmt" "net/url" "path" @@ -39,6 +40,7 @@ import ( const ( impulseKind= "CreateCollection" parDoKind = "ParallelDo" + combineKind= "CombineValues" flattenKind= "Flatten" gbkKind= "GroupByKey" windowIntoKind = "Bucket" @@ -166,6 +168,37 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er prop.ParallelInput = x.pcollections[in] prop.SerializedFn = id // == reference into the proto pipeline return append(steps, x.newStep(id, parDoKind, prop)), nil + case graphx.URNCombinePerKey: + // Dataflow uses a GBK followed by a CombineValues to determine when it can lift. + // To achieve this, we use the combine composite's subtransforms, and modify the + // Combine ParDo with the CombineValues kind, set its SerializedFn to map to the + // composite payload, and the accumulator coding. + if len(t.Subtransforms) != 2 { + return nil, fmt.Errorf("invalid CombinePerKey, expected 2 subtransforms but got %d in %v", len(t.Subtransforms), t) + } + steps, err := x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms) + if err != nil { + return nil, fmt.Errorf("invalid CombinePerKey, couldn't extract GBK from %v: %v", t, err) + } + var payload pb.CombinePayload + if err := proto.Unmarshal(t.Spec.Payload, ); err != nil { + return nil, fmt.Errorf("invalid Combine payload for %v: %v", t, err) + } + + c, err := x.coders.Coder(payload.AccumulatorCoderId) + if err != nil { + return nil, fmt.Errorf("invalid Combine payload , missing Accumulator Coder %v: %v", t, err) + } + enc, err := graphx.EncodeCoderRef(c) + if err != nil { + return nil, fmt.Errorf("invalid Combine payload, couldn't encode Accumulator Coder %v: %v", t, err) + } + json.Unmarshal([]byte(steps[1].Properties), ) + prop.Encoding = enc + prop.SerializedFn = id + steps[1].Kind = combineKind + steps[1].Properties = newMsg(prop) + return steps, nil case graphx.URNFlatten: for _, in := range t.Inputs { @@ -177,7 +210,6 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er in := stringx.SingleValue(t.Inputs) prop.ParallelInput = x.pcollections[in] - prop.DisallowCombinerLifting = true prop.SerializedFn = encodeSerializedFn(x.extractWindowingStrategy(in)) return []*df.Step{x.newStep(id, gbkKind, prop)}, nil @@ -224,8 +256,6 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er } default: - // TODO: graphx.URNCombinePerKey: - if len(t.Subtransforms) > 0 { return x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms) } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 131447) Time Spent: 9h
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131162=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131162 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Aug/18 01:32 Start Date: 04/Aug/18 01:32 Worklog Time Spent: 10m Work Description: lostluck commented on issue #6143: [BEAM-4276] Combiner lifting for Dataflow URL: https://github.com/apache/beam/pull/6143#issuecomment-410412665 R: @herohde @youngoli I'm pretty sure I've got this working properly, but the integration tests are too small for a meaningful difference when combiners are lifted. Oddly, the wordcount Dataflow jobs seems to like taking ~3m3s to complete. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 131162) Time Spent: 9.5h (was: 9h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 9.5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131161=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131161 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Aug/18 01:20 Start Date: 04/Aug/18 01:20 Worklog Time Spent: 10m Work Description: lostluck commented on issue #6143: [BEAM-4276] Combiner lifting for Dataflow URL: https://github.com/apache/beam/pull/6143#issuecomment-410411914 Run Go PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 131161) Time Spent: 9h 20m (was: 9h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131158=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131158 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Aug/18 01:08 Start Date: 04/Aug/18 01:08 Worklog Time Spent: 10m Work Description: lostluck commented on issue #6143: [BEAM-4276] Combiner lifting for Dataflow URL: https://github.com/apache/beam/pull/6143#issuecomment-410410983 Run Go PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 131158) Time Spent: 9h 10m (was: 9h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 9h 10m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131157=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131157 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Aug/18 01:04 Start Date: 04/Aug/18 01:04 Worklog Time Spent: 10m Work Description: lostluck commented on issue #6143: [BEAM-4276] Combiner lifting for Dataflow URL: https://github.com/apache/beam/pull/6143#issuecomment-410410696 Run Go PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 131157) Time Spent: 9h (was: 8h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=131156=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131156 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Aug/18 01:02 Start Date: 04/Aug/18 01:02 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #6143: [BEAM-4276] Combiner lifting for Dataflow URL: https://github.com/apache/beam/pull/6143 The previous changes set up combiner lifting for the portable representation, but Dataflow still uses its own configuration. This change closes the gap. Follow this checklist to help us incorporate your contribution quickly and easily: - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 131156) Time Spent: 8h 50m (was: 8h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 8h 50m >
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=111385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111385 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 13/Jun/18 02:43 Start Date: 13/Jun/18 02:43 Worklog Time Spent: 10m Work Description: herohde closed pull request #5620: [BEAM-4276] Surface graph.Fn encoding functions. URL: https://github.com/apache/beam/pull/5620 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/core/runtime/graphx/user.go b/sdks/go/pkg/beam/core/runtime/graphx/user.go index ab75b3d9fea..969a513f491 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/user.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/user.go @@ -23,6 +23,7 @@ import ( "encoding/base64" "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" @@ -80,6 +81,28 @@ func DecodeFn(data string) (reflectx.Func, error) { return fn.Fn, nil } +// EncodeGraphFn encodes a *graph.Fn as a string. +func EncodeGraphFn(u *graph.Fn) (string, error) { + ref, err := encodeFn(u) + if err != nil { + return "", err + } + return protox.EncodeBase64(ref) +} + +// DecodeGraphFn decodes an encoded *graph.Fn. +func DecodeGraphFn(data string) (*graph.Fn, error) { + var ref v1.Fn + if err := protox.DecodeBase64(data, ); err != nil { + return nil, err + } + fn, err := decodeFn() + if err != nil { + return nil, err + } + return fn, nil +} + // EncodeCoder encodes a coder as a string. Any custom coder function // symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must // be encodable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111385) Time Spent: 8h 40m (was: 8.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=111351=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111351 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 12/Jun/18 23:49 Start Date: 12/Jun/18 23:49 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5620: [BEAM-4276] Surface graph.Fn encoding functions. URL: https://github.com/apache/beam/pull/5620#issuecomment-396769557 R: @herohde PTAL Comments addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111351) Time Spent: 8.5h (was: 8h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=111350=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111350 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 12/Jun/18 23:48 Start Date: 12/Jun/18 23:48 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5620: [BEAM-4276] Surface graph.Fn encoding functions. URL: https://github.com/apache/beam/pull/5620#discussion_r194921365 ## File path: sdks/go/pkg/beam/core/runtime/graphx/user.go ## @@ -80,6 +81,28 @@ func DecodeFn(data string) (reflectx.Func, error) { return fn.Fn, nil } +// EncodeGraphFn encodes a *graph.Fn as a string. +func EncodeGraphFn(u *graph.Fn) (string, error) { Review comment: It's related to Combiner Lifting, so I've used that jira tag. Since it can help with writing a bundle level precombine, using existing combineFns, which is what I'm using it for at present, since the beam model/SDK doesn't yet support lifting combines through a CoGroupByKey. That's the root of it anyway. I like thinking of it as enabling the writing of MetaStructs. The main goal is for me to avoid needing to write my precombine helper one way now and then re-writing later when there is official support for such a thing. If that feature does show up, I can simply use the same CombineFns over again. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111350) Time Spent: 8h 20m (was: 8h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 8h 20m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=110642=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110642 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 11/Jun/18 15:15 Start Date: 11/Jun/18 15:15 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/combine.go b/sdks/go/pkg/beam/combine.go index d4508c7379b..85d99464a1f 100644 --- a/sdks/go/pkg/beam/combine.go +++ b/sdks/go/pkg/beam/combine.go @@ -50,6 +50,7 @@ func TryCombine(s Scope, combinefn interface{}, col PCollection) (PCollection, e // for multiple reasons, notably that the combinefn is not valid or cannot be bound // -- due to type mismatch, say -- to the incoming PCollection. func TryCombinePerKey(s Scope, combinefn interface{}, col PCollection) (PCollection, error) { + s = s.Scope(graph.CombinePerKeyScope) ValidateKVType(col) col, err := TryGroupByKey(s, col) if err != nil { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 110642) Time Spent: 8h 10m (was: 8h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=110640=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110640 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 11/Jun/18 15:09 Start Date: 11/Jun/18 15:09 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581#issuecomment-396278036 R: @jasonkuster Please merge. :) Now Luke is out sick from the weekend. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 110640) Time Spent: 8h (was: 7h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109879=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109879 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 07/Jun/18 21:52 Start Date: 07/Jun/18 21:52 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581 Without this line, no transforms can get lifted. It was missed from the previous PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109879) Time Spent: 7h 50m (was: 7h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109877=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109877 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 07/Jun/18 21:52 Start Date: 07/Jun/18 21:52 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581#issuecomment-395577396 R: @lukecwik Please merge, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109877) Time Spent: 7.5h (was: 7h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109878=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109878 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 07/Jun/18 21:52 Start Date: 07/Jun/18 21:52 Worklog Time Spent: 10m Work Description: lostluck closed pull request #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/combine.go b/sdks/go/pkg/beam/combine.go index d4508c7379b..85d99464a1f 100644 --- a/sdks/go/pkg/beam/combine.go +++ b/sdks/go/pkg/beam/combine.go @@ -50,6 +50,7 @@ func TryCombine(s Scope, combinefn interface{}, col PCollection) (PCollection, e // for multiple reasons, notably that the combinefn is not valid or cannot be bound // -- due to type mismatch, say -- to the incoming PCollection. func TryCombinePerKey(s Scope, combinefn interface{}, col PCollection) (PCollection, error) { + s = s.Scope(graph.CombinePerKeyScope) ValidateKVType(col) col, err := TryGroupByKey(s, col) if err != nil { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109878) Time Spent: 7h 40m (was: 7.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109874=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109874 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 07/Jun/18 21:48 Start Date: 07/Jun/18 21:48 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581#issuecomment-395576275 R: @herohde This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109874) Time Spent: 7h 20m (was: 7h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109872=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109872 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 07/Jun/18 21:47 Start Date: 07/Jun/18 21:47 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #5581: [BEAM-4276] Add missing scope line URL: https://github.com/apache/beam/pull/5581 Without this line, no transforms can get lifted. It was missed from the previous PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109872) Time Spent: 7h 10m (was: 7h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109196=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109196 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 05/Jun/18 20:46 Start Date: 05/Jun/18 20:46 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/combine.go b/sdks/go/pkg/beam/combine.go index b841d916450..d4508c7379b 100644 --- a/sdks/go/pkg/beam/combine.go +++ b/sdks/go/pkg/beam/combine.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" ) // Combine inserts a global Combine transform into the pipeline. It @@ -59,8 +60,16 @@ func TryCombinePerKey(s Scope, combinefn interface{}, col PCollection) (PCollect if err != nil { return PCollection{}, fmt.Errorf("invalid CombineFn: %v", err) } + // This seems like the best place to infer the accumulator coder type, unless + // it's a universal type. + // We can get the fulltype from the return value of the mergeAccumulatorFn + // TODO(lostluck): 2018/05/28 Correctly infer universal type coder if necessary. + accumCoder, err := inferCoder(typex.New(fn.MergeAccumulatorsFn().Ret[0].T)) + if err != nil { + return PCollection{}, fmt.Errorf("unable to infer CombineFn accumulator coder: %v", err) + } - edge, err := graph.NewCombine(s.real, s.scope, fn, col.n) + edge, err := graph.NewCombine(s.real, s.scope, fn, col.n, accumCoder) if err != nil { return PCollection{}, err } diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index 4605470bab0..74eb94c26fa 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -136,21 +136,19 @@ type Payload struct { Data []byte } -// TODO(herohde) 5/24/2017: how should we represent/obtain the coder for Combine -// accumulator types? Coder registry? Assume JSON? - // MultiEdge represents a primitive data processing operation. Each non-user // code operation may be implemented by either the harness or the runner. type MultiEdge struct { id int parent *Scope - OpOpcode - DoFn *DoFn // ParDo - CombineFn *CombineFn // Combine - Value []byte // Impulse - Payload *Payload // External - WindowFn *window.Fn // WindowInto + Op Opcode + DoFn *DoFn// ParDo + CombineFn *CombineFn // Combine + AccumCoder *coder.Coder // Combine + Value []byte // Impulse + Payload*Payload // External + WindowFn *window.Fn // WindowInto Input []*Inbound Output []*Outbound @@ -317,9 +315,17 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma return edge, nil } +// CombinePerKeyScope is the Go SDK canonical name for the combine composite +// scope. With Beam Portability, "primitive" composite transforms like +// combine have their URNs & payloads attached to a high level scope, with a +// default representation beneath. The use of this const permits the +// translation layer to confirm the SDK expects this combine to be liftable +// by a runner and should set this scope's URN and Payload accordingly. +const CombinePerKeyScope = "CombinePerKey" + // NewCombine inserts a new Combine edge into the graph. Combines cannot have side // input. -func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node) (*MultiEdge, error) { +func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) { inT := in.Type() if !typex.IsCoGBK(inT) { return nil, fmt.Errorf("combine requires CoGBK type: %v", inT) @@ -380,6 +386,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node) (*MultiEdge, error) edge := g.NewEdge(s) edge.Op = Combine edge.CombineFn = u + edge.AccumCoder = ac edge.Input = []*Inbound{{Kind: kinds[0], From: in, Type: inbound[0]}} for i := 0; i < len(out); i++ { n := g.NewNode(out[i], in.WindowingStrategy(), in.Bounded()) diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go index
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109197=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109197 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 05/Jun/18 20:46 Start Date: 05/Jun/18 20:46 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-394854383 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109197) Time Spent: 7h (was: 6h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=109190=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109190 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 05/Jun/18 20:29 Start Date: 05/Jun/18 20:29 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-394849337 R: @lukecwik since Jason is out today. Please merge! Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109190) Time Spent: 6h 40m (was: 6.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108899 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 05/Jun/18 00:47 Start Date: 05/Jun/18 00:47 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-394544853 I rebased to head but otherwise the PR is unchanged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108899) Time Spent: 6.5h (was: 6h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108815=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108815 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 23:35 Start Date: 04/Jun/18 23:35 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-394533171 R: @jasonkuster This is a meaty Go CL, so please add any oversight as you see fit, and otherwise, merge at your discretion. Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108815) Time Spent: 6h 20m (was: 6h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108813=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108813 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 23:33 Start Date: 04/Jun/18 23:33 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192909642 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine_test.go ## @@ -148,7 +96,14 @@ func getCombineEdge(t *testing.T, cfn interface{}, ac *coder.Coder) *graph.Multi } g := graph.New() - inT := typex.NewCoGBK(typex.New(reflectx.Int), typex.New(reflectx.Int)) + var vtype reflect.Type + if fn.AddInputFn() != nil { + // This makes the assumption that the AddInput function is unkeyed. Review comment: As in it's unimplemented since it's a helper for a test, and none of the test cases exercise that at present. As implemented the CreateAccumulator and the AddInput functions are permitted to take a key as input. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108813) Time Spent: 6h 10m (was: 6h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108812=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108812 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 23:31 Start Date: 04/Jun/18 23:31 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192909318 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine_test.go ## @@ -60,9 +170,30 @@ func TestCombine(t *testing.T) { if err := p.Down(context.Background()); err != nil { t.Fatalf("down failed: %v", err) } +} - expected := makeKV(42, 21) - if !equalList(out.Elements, expected) { - t.Errorf("pardo(sumFn) = %v, want %v", extractKeyedValues(out.Elements...), extractKeyedValues(expected...)) +func mergeFn(a, b int) int { + return a + b +} + +type MyCombine struct{} + +func (*MyCombine) AddInput(k int64, a int) int64 { + return k + int64(a) +} + +func (*MyCombine) MergeAccumulators(a, b int64) int64 { + return a + b +} + +func (*MyCombine) ExtractOutput(a int64) int { + return int(a) +} + Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108812) Time Spent: 6h (was: 5h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108809=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108809 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 23:24 Start Date: 04/Jun/18 23:24 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192908195 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine_test.go ## @@ -148,7 +96,14 @@ func getCombineEdge(t *testing.T, cfn interface{}, ac *coder.Coder) *graph.Multi } g := graph.New() - inT := typex.NewCoGBK(typex.New(reflectx.Int), typex.New(reflectx.Int)) + var vtype reflect.Type + if fn.AddInputFn() != nil { + // This makes the assumption that the AddInput function is unkeyed. Review comment: Not sure what the impact of this comment is. Are AddInput functions supposed to allow keys? That is, this comment indicates that the full contract is not implemented? Or does it mean an AddInput function should always be unkeyed, but we're not bothering to check? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108809) Time Spent: 5h 50m (was: 5h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108808=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108808 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 23:17 Start Date: 04/Jun/18 23:17 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192907083 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -42,10 +42,12 @@ type Combine struct { errerrorx.GuardedError } +// ID returns the UnitID for this node. Review comment: Cool. Just wanted a sanity check on the idea. I've been finding myself hitting the same pattern in other pipeline code. I was surprisingly successful extracting a base, and wanted to take a pass at this chunk of code. I'll open a JIRA for this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108808) Time Spent: 5h 40m (was: 5.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108767=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108767 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:26 Start Date: 04/Jun/18 21:26 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885648 ## File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go ## @@ -148,10 +148,58 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string { Inputs:diff(in, out), Outputs: diff(out, in), } + + m.updateIfCombineComposite(s, transform) + m.transforms[id] = transform return id } +// updateIfCombineComposite examines the scope tree and sets the PTransform Spec +// to be a CombinePerKey with a CombinePayload if it's a liftable composite. +// Beam Portability requires that composites Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108767) Time Spent: 4h 50m (was: 4h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108768=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108768 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:26 Start Date: 04/Jun/18 21:26 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885675 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -217,3 +224,162 @@ func (n *Combine) fail(err error) error { func (n *Combine) String() string { return fmt.Sprintf("Combine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) } + +// The nodes below break apart the Combine into components to support +// Combiner Lifting optimizations. + +// LiftedCombine is an executor for combining values before grouping by keys +// for a lifted combine. Partially groups values by key within a bundle, +// accumulating them in an in memory cache, before emitting them in the +// FinishBundle step. +type LiftedCombine struct { + *Combine + + cache map[interface{}]FullValue +} + +func (n *LiftedCombine) String() string { + return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) +} + +// StartBundle initializes the in memory cache of keys to accumulators. +func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataManager) error { + if err := n.Combine.StartBundle(ctx, id, data); err != nil { + return err + } + n.cache = make(map[interface{}]FullValue) + return nil +} + +// ProcessElement takes a KV pair and combines values with the same into an accumulator, +// caching them until the bundle is complete. +func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error { + if n.status != Active { + return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status) + } + + // Value is a KV so Elm & Elm2 are populated. + // Check the cache for an already present accumulator + + afv, notfirst := n.cache[value.Elm] + var a interface{} + if notfirst { + a = afv.Elm2 + } else { + b, err := n.newAccum(ctx, value.Elm) + if err != nil { + return n.fail(err) + } + a = b + } + + a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, !notfirst) + if err != nil { + return n.fail(err) + } + + // Cache the accumulator with the key + n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm, Elm2: a, Timestamp: value.Timestamp} + + return nil +} + +// FinishBundle iterates through the cached key, accumulator pairs, and then Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108768) Time Spent: 5h (was: 4h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108771 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:26 Start Date: 04/Jun/18 21:26 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-394505228 PTAL @wcn3 Comments addressed and ready for another pass! Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108771) Time Spent: 5.5h (was: 5h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108770 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:26 Start Date: 04/Jun/18 21:26 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885741 ## File path: sdks/go/pkg/beam/core/graph/edge.go ## @@ -317,9 +315,17 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma return edge, nil } +// CombinePerKeyScope is the canonical name for the scope containing combine Review comment: Reduced it to just combine composite, since scope & composite transforms are technically the same thing as far as our graph translation is concerned. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108770) Time Spent: 5h 20m (was: 5h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108769=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108769 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:26 Start Date: 04/Jun/18 21:26 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885698 ## File path: sdks/go/pkg/beam/core/graph/edge.go ## @@ -317,9 +315,17 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma return edge, nil } +// CombinePerKeyScope is the canonical name for the scope containing combine +// composite. With Beam Portability, "primitive" composite transforms like +// combine have their URNs & payloads attached to a high level scope, with a +// default representation beneath. This const permits the translation layer Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108769) Time Spent: 5h 10m (was: 5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108765 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:25 Start Date: 04/Jun/18 21:25 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885583 ## File path: sdks/go/pkg/beam/core/runtime/exec/translate.go ## @@ -34,9 +34,13 @@ import ( "github.com/golang/protobuf/ptypes" ) +// TODO(lostluck): 2018/05/28 Extract these from the canonical enums in beam_runner_api.proto Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108765) Time Spent: 4.5h (was: 4h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108762 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:25 Start Date: 04/Jun/18 21:25 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885506 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -42,10 +42,12 @@ type Combine struct { errerrorx.GuardedError } +// ID returns the UnitID for this node. Review comment: I agree it's worthwhile since everything has a UID, status and err. Most (if not all) have an Out and a Fn (if not a CombineFn) The alternative to the embedding would be to Interface invoke them, since there's no transparent-to-the-code inheritance of field & method overrides for embeds. (no Javastyle subclassing). I think that's still slower than the embedded approach as far as what compiles due to the interface overhead. Henning's main concern about using embedding was that it can be hard to follow the logic if the embedded class is in a different file, but it really depends on how much we're embedding. We can trivially handle removing ID() & String(), along with fail(), but the real trick would be meaningfully removing the duplicated state machine code that prefixes most of the methods. I would almost suggest moving those outside of the nodes themselves, but since they're getting called in the stack, they still need to be invoked per node, in order. I vote we estimate how much code it would save, and whether that's worth it over the explicit duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108762) Time Spent: 4h 10m (was: 4h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108766 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:25 Start Date: 04/Jun/18 21:25 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885621 ## File path: sdks/go/pkg/beam/transforms/top/top.go ## @@ -106,9 +106,9 @@ func validate(t typex.FullType, n int, less interface{}) { funcx.MustSatisfy(less, funcx.Replace(sig, beam.TType, t.Type())) } -// TODO(herohde) 5/25/2017: the accumulator should be serializable with a Coder. -// We need a coder here, because the elements are generally code-able only. Until -// then, we do not support combiner lifting. +// TODO(herohde) 5/25/2017: BEAM- the accumulator should be serializable Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108766) Time Spent: 4h 40m (was: 4.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=108764=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108764 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 04/Jun/18 21:25 Start Date: 04/Jun/18 21:25 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192885546 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -217,3 +224,162 @@ func (n *Combine) fail(err error) error { func (n *Combine) String() string { return fmt.Sprintf("Combine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) } + +// The nodes below break apart the Combine into components to support +// Combiner Lifting optimizations. + +// LiftedCombine is an executor for combining values before grouping by keys +// for a lifted combine. Partially groups values by key within a bundle, +// accumulating them in an in memory cache, before emitting them in the +// FinishBundle step. +type LiftedCombine struct { + *Combine + + cache map[interface{}]FullValue +} + +func (n *LiftedCombine) String() string { + return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) +} + +// StartBundle initializes the in memory cache of keys to accumulators. +func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataManager) error { + if err := n.Combine.StartBundle(ctx, id, data); err != nil { + return err + } + n.cache = make(map[interface{}]FullValue) + return nil +} + +// ProcessElement takes a KV pair and combines values with the same into an accumulator, +// caching them until the bundle is complete. +func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error { + if n.status != Active { + return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status) + } + + // Value is a KV so Elm & Elm2 are populated. + // Check the cache for an already present accumulator + + afv, notfirst := n.cache[value.Elm] + var a interface{} + if notfirst { + a = afv.Elm2 + } else { + b, err := n.newAccum(ctx, value.Elm) + if err != nil { + return n.fail(err) + } + a = b + } + + a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, !notfirst) + if err != nil { + return n.fail(err) + } + + // Cache the accumulator with the key + n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm, Elm2: a, Timestamp: value.Timestamp} Review comment: JIRA filed https://issues.apache.org/jira/projects/BEAM/issues/BEAM-4468 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108764) Time Spent: 4h 20m (was: 4h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Specifically add the necessary code to produce a Combine Composite with the > correct URN, and permit the SDK harness to understand the lifted parts when > receiving a bundle plan from the worker. > Not expected as part of this issue is: > Additional performance tweaks to the in memory cache (See > [BEAM-4468|https://issues.apache.org/jira/browse/BEAM-4468]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107856=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107856 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192217992 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -217,3 +224,162 @@ func (n *Combine) fail(err error) error { func (n *Combine) String() string { return fmt.Sprintf("Combine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) } + +// The nodes below break apart the Combine into components to support +// Combiner Lifting optimizations. + +// LiftedCombine is an executor for combining values before grouping by keys +// for a lifted combine. Partially groups values by key within a bundle, +// accumulating them in an in memory cache, before emitting them in the +// FinishBundle step. +type LiftedCombine struct { + *Combine + + cache map[interface{}]FullValue +} + +func (n *LiftedCombine) String() string { + return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) +} + +// StartBundle initializes the in memory cache of keys to accumulators. +func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataManager) error { + if err := n.Combine.StartBundle(ctx, id, data); err != nil { + return err + } + n.cache = make(map[interface{}]FullValue) + return nil +} + +// ProcessElement takes a KV pair and combines values with the same into an accumulator, +// caching them until the bundle is complete. +func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error { + if n.status != Active { + return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status) + } + + // Value is a KV so Elm & Elm2 are populated. + // Check the cache for an already present accumulator + + afv, notfirst := n.cache[value.Elm] + var a interface{} + if notfirst { + a = afv.Elm2 + } else { + b, err := n.newAccum(ctx, value.Elm) + if err != nil { + return n.fail(err) + } + a = b + } + + a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, !notfirst) + if err != nil { + return n.fail(err) + } + + // Cache the accumulator with the key + n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm, Elm2: a, Timestamp: value.Timestamp} Review comment: I think it would be incredibly helpful if progress reporting could report the number of elements in a cache (as an SDK-specific counter (or standard runner counter?)). A data structure that grows without bounds, without some observability hook, concerns me. Not required for this review; filing a JIRA is fine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107856) Time Spent: 2h 50m (was: 2h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107859 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192210019 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -42,10 +42,12 @@ type Combine struct { errerrorx.GuardedError } +// ID returns the UnitID for this node. Review comment: Do we want to consider embedding a "base class" struct that provides these trivial behaviors? It reduces needless-looking code and would additionally improve the output of golint since these methods would have 1, not N, implementations. Not necessary to make this change for this PR, just wanted to consider it. Can open a JIRA if we agree it's worthwhile. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107859) Time Spent: 3h 20m (was: 3h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107863=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107863 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192209548 ## File path: sdks/go/pkg/beam/core/graph/edge.go ## @@ -317,9 +315,17 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma return edge, nil } +// CombinePerKeyScope is the canonical name for the scope containing combine +// composite. With Beam Portability, "primitive" composite transforms like +// combine have their URNs & payloads attached to a high level scope, with a +// default representation beneath. This const permits the translation layer Review comment: The use of this const... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107863) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107861 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192220524 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine_test.go ## @@ -60,9 +170,30 @@ func TestCombine(t *testing.T) { if err := p.Down(context.Background()); err != nil { t.Fatalf("down failed: %v", err) } +} - expected := makeKV(42, 21) - if !equalList(out.Elements, expected) { - t.Errorf("pardo(sumFn) = %v, want %v", extractKeyedValues(out.Elements...), extractKeyedValues(expected...)) +func mergeFn(a, b int) int { + return a + b +} + +type MyCombine struct{} + +func (*MyCombine) AddInput(k int64, a int) int64 { + return k + int64(a) +} + +func (*MyCombine) MergeAccumulators(a, b int64) int64 { + return a + b +} + +func (*MyCombine) ExtractOutput(a int64) int { + return int(a) +} + Review comment: It'd be nice to have a more elaborate combiner with differentiated input, accumulator, and output types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107861) Time Spent: 3h 40m (was: 3.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107864=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107864 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192221943 ## File path: sdks/go/pkg/beam/transforms/top/top.go ## @@ -106,9 +106,9 @@ func validate(t typex.FullType, n int, less interface{}) { funcx.MustSatisfy(less, funcx.Replace(sig, beam.TType, t.Type())) } -// TODO(herohde) 5/25/2017: the accumulator should be serializable with a Coder. -// We need a coder here, because the elements are generally code-able only. Until -// then, we do not support combiner lifting. +// TODO(herohde) 5/25/2017: BEAM- the accumulator should be serializable Review comment: Do we have a number to fill in here yet? It's been a year. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107864) Time Spent: 3h 50m (was: 3h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107865 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192220206 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine_test.go ## @@ -60,9 +170,30 @@ func TestCombine(t *testing.T) { if err := p.Down(context.Background()); err != nil { t.Fatalf("down failed: %v", err) } +} - expected := makeKV(42, 21) - if !equalList(out.Elements, expected) { - t.Errorf("pardo(sumFn) = %v, want %v", extractKeyedValues(out.Elements...), extractKeyedValues(expected...)) +func mergeFn(a, b int) int { + return a + b +} + +type MyCombine struct{} Review comment: Wouldn't hurt to add a comment indicating that int is the input type and int64 is the accumulator and output type. The test is an authoritative piece of code demonstrating the contract. That comment will help people write their own combiners more easily, I believe. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107865) Time Spent: 4h (was: 3h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107860 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192221679 ## File path: sdks/go/pkg/beam/core/runtime/exec/translate.go ## @@ -34,9 +34,13 @@ import ( "github.com/golang/protobuf/ptypes" ) +// TODO(lostluck): 2018/05/28 Extract these from the canonical enums in beam_runner_api.proto Review comment: I have a code snippet that does most of this. Let me flesh it out and send it to you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107860) Time Spent: 3.5h (was: 3h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107862=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107862 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192217544 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -217,3 +224,162 @@ func (n *Combine) fail(err error) error { func (n *Combine) String() string { return fmt.Sprintf("Combine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) } + +// The nodes below break apart the Combine into components to support +// Combiner Lifting optimizations. + +// LiftedCombine is an executor for combining values before grouping by keys +// for a lifted combine. Partially groups values by key within a bundle, +// accumulating them in an in memory cache, before emitting them in the +// FinishBundle step. +type LiftedCombine struct { + *Combine + + cache map[interface{}]FullValue +} + +func (n *LiftedCombine) String() string { + return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey, n.Out.ID()) +} + +// StartBundle initializes the in memory cache of keys to accumulators. +func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataManager) error { + if err := n.Combine.StartBundle(ctx, id, data); err != nil { + return err + } + n.cache = make(map[interface{}]FullValue) + return nil +} + +// ProcessElement takes a KV pair and combines values with the same into an accumulator, +// caching them until the bundle is complete. +func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue, values ...ReStream) error { + if n.status != Active { + return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status) + } + + // Value is a KV so Elm & Elm2 are populated. + // Check the cache for an already present accumulator + + afv, notfirst := n.cache[value.Elm] + var a interface{} + if notfirst { + a = afv.Elm2 + } else { + b, err := n.newAccum(ctx, value.Elm) + if err != nil { + return n.fail(err) + } + a = b + } + + a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, !notfirst) + if err != nil { + return n.fail(err) + } + + // Cache the accumulator with the key + n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm, Elm2: a, Timestamp: value.Timestamp} + + return nil +} + +// FinishBundle iterates through the cached key, accumulator pairs, and then Review comment: cached (key, accumulator) pairs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107862) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107858=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107858 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192221853 ## File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go ## @@ -148,10 +148,58 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string { Inputs:diff(in, out), Outputs: diff(out, in), } + + m.updateIfCombineComposite(s, transform) + m.transforms[id] = transform return id } +// updateIfCombineComposite examines the scope tree and sets the PTransform Spec +// to be a CombinePerKey with a CombinePayload if it's a liftable composite. +// Beam Portability requires that composites Review comment: Unfinished comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107858) Time Spent: 3h 10m (was: 3h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107857=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107857 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 20:10 Start Date: 31/May/18 20:10 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#discussion_r192209339 ## File path: sdks/go/pkg/beam/core/graph/edge.go ## @@ -317,9 +315,17 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma return edge, nil } +// CombinePerKeyScope is the canonical name for the scope containing combine Review comment: nit: scope-containing. I was reading this wondering how a Go SDK scope applied here. That's not the case, I believe. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107857) Time Spent: 3h (was: 2h 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=107743=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107743 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 31/May/18 16:29 Start Date: 31/May/18 16:29 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-393592119 R: @herohde @wcn3 Ping :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 107743) Time Spent: 2h 40m (was: 2.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=106936=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106936 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 29/May/18 23:21 Start Date: 29/May/18 23:21 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: Robert Burke Robert Burke [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-392979084 FYI: It has been verified that this implementation still works with the unchanged Go Direct runner and with the Dataflow runner. Dataflow however doesn't yet support lifting for portable pipelines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106936) Time Spent: 2.5h (was: 2h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=106932=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106932 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 29/May/18 23:19 Start Date: 29/May/18 23:19 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5507: Robert Burke Robert Burke [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507#issuecomment-392978689 R: @herohde @wcn3 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106932) Time Spent: 2h 20m (was: 2h 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=106927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106927 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 29/May/18 23:18 Start Date: 29/May/18 23:18 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #5507: Robert Burke Robert Burke [BEAM-4276] Add combiner lifting support to Go SDK URL: https://github.com/apache/beam/pull/5507 Implements combiner lifting support to the Go SDK, for universal runners. Per https://s.apache.org/beam-runner-api-combine-model, the Go SDK puts the URN and CombinePayload on the composite containing the basic unlifted combine implementation for runners which don't support the optimization. There are a few TODOs but I can lift them into JIRAs as necessary. Of note is that the current go implementation for Top uses an unencodable accumulator type (it contains []interface{}) so it is unable to be lifted at present. However, when this case is detected, the SDK will simply not add the URN and payload and continue with it's existing representation as a vanilla ParDo to runners. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106927) Time Spent: 2h 10m (was: 2h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103201=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103201 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 01:39 Start Date: 18/May/18 01:39 Worklog Time Spent: 10m Work Description: jasonkuster closed pull request #5405: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5405 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/core/runtime/harness/session/session.pb.go b/sdks/go/pkg/beam/core/runtime/harness/session/session.pb.go index 80487530a00..f697df8b123 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/session/session.pb.go +++ b/sdks/go/pkg/beam/core/runtime/harness/session/session.pb.go @@ -152,9 +152,7 @@ func (m *Entry) String() string{ return proto.CompactTextString(m) } func (*Entry) ProtoMessage() {} func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } -type isEntry_Msg interface { - isEntry_Msg() -} +type isEntry_Msg interface{ isEntry_Msg() } type Entry_InstReq struct { InstReq *org_apache_beam_model_fn_execution_v1.InstructionRequest `protobuf:"bytes,1000,opt,name=inst_req,json=instReq,oneof"` diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go index 8e5b5bab25e..e3147f5e031 100644 --- a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go +++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go @@ -48,8 +48,8 @@ import fmt "fmt" import math "math" import org_apache_beam_model_pipeline_v1 "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" import org_apache_beam_model_pipeline_v11 "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" -import google_protobuf1 "github.com/golang/protobuf/ptypes/timestamp" -import google_protobuf2 "github.com/golang/protobuf/ptypes/wrappers" +import google_protobuf2 "github.com/golang/protobuf/ptypes/timestamp" +import google_protobuf3 "github.com/golang/protobuf/ptypes/wrappers" import ( context "golang.org/x/net/context" @@ -218,9 +218,7 @@ func (m *InstructionRequest) String() string{ return proto.CompactTe func (*InstructionRequest) ProtoMessage() {} func (*InstructionRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } -type isInstructionRequest_Request interface { - isInstructionRequest_Request() -} +type isInstructionRequest_Request interface{ isInstructionRequest_Request() } type InstructionRequest_Register struct { Register *RegisterRequest `protobuf:"bytes,1000,opt,name=register,oneof"` @@ -421,9 +419,7 @@ func (m *InstructionResponse) String() string{ return proto.CompactT func (*InstructionResponse) ProtoMessage() {} func (*InstructionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } -type isInstructionResponse_Response interface { - isInstructionResponse_Response() -} +type isInstructionResponse_Response interface{ isInstructionResponse_Response() } type InstructionResponse_Register struct { Register *RegisterResponse `protobuf:"bytes,1000,opt,name=register,oneof"` @@ -761,7 +757,7 @@ type BundleSplit_Application struct { // represented by invoking this Application and its downstream applications. // The sum of fraction_of_work between all primary_roots and residual_roots // must add up to approximately 1.0. - FractionOfWork *google_protobuf2.DoubleValue `protobuf:"bytes,5,opt,name=fraction_of_work,json=fractionOfWork" json:"fraction_of_work,omitempty"` + FractionOfWork *google_protobuf3.DoubleValue `protobuf:"bytes,5,opt,name=fraction_of_work,json=fractionOfWork" json:"fraction_of_work,omitempty"` } func (m *BundleSplit_Application) Reset(){ *m = BundleSplit_Application{} } @@ -797,7 +793,7 @@ func (m *BundleSplit_Application) GetOutputWatermarks() map[string]int64 { return nil } -func (m *BundleSplit_Application) GetFractionOfWork() *google_protobuf2.DoubleValue { +func (m *BundleSplit_Application) GetFractionOfWork() *google_protobuf3.DoubleValue { if m != nil { return m.FractionOfWork } @@ -1081,9 +1077,7 @@ func (m *Metrics_User) String() string{ return proto.CompactTextStri func (*Metrics_User) ProtoMessage() {} func (*Metrics_User) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11, 1} } -type isMetrics_User_Data interface
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103200=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103200 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 01:38 Start Date: 18/May/18 01:38 Worklog Time Spent: 10m Work Description: jasonkuster commented on issue #5405: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5405#issuecomment-390067961 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103200) Time Spent: 1h 50m (was: 1h 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103196=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103196 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 00:57 Start Date: 18/May/18 00:57 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5405: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5405#issuecomment-390060684 R: @jasonkuster Please sir, may I have a merge? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103196) Time Spent: 1h 40m (was: 1.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103193=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103193 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 00:52 Start Date: 18/May/18 00:52 Worklog Time Spent: 10m Work Description: wcn3 commented on issue #5405: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5405#issuecomment-390059746 The deltas here are a byproduct of using a new version of the go protoc plugin. This change is good. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103193) Time Spent: 1.5h (was: 1h 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103188=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103188 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 00:34 Start Date: 18/May/18 00:34 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5402: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5402#issuecomment-390056562 Closing since it's conflating updating the protos, and updating the version of protoc-go-gen used. Moving work to https://github.com/apache/beam/pull/5405 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103188) Time Spent: 1h 10m (was: 1h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103187=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103187 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 00:24 Start Date: 18/May/18 00:24 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5405: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5405#issuecomment-390055090 R: @herohde @wcn3 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103187) Time Spent: 1h (was: 50m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103185=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103185 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 18/May/18 00:21 Start Date: 18/May/18 00:21 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #5405: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5405 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103185) Time Spent: 50m (was: 40m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103155=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103155 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 17/May/18 22:47 Start Date: 17/May/18 22:47 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5402: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5402#issuecomment-390037264 Run Go PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103155) Time Spent: 40m (was: 0.5h) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103147=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103147 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 17/May/18 22:23 Start Date: 17/May/18 22:23 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5402: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5402#issuecomment-390032855 This is due to the [go protobuf changes](https://groups.google.com/forum/#!topic/protobuf/N-elvFu4dFM) landing, which changes internal details. I've re-generated the lockfile. Apparently the only way to do that AFAICT is to delete it, and then re-add it. rm sdks/go/gogradle.lock ./gradlew :beam-sdks-go:lock If there's a better way, then I'd love to know, but if that's confirmed as the right way, I'll also update the BUILD.md file with the info. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103147) Time Spent: 0.5h (was: 20m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103122=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103122 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 17/May/18 21:24 Start Date: 17/May/18 21:24 Worklog Time Spent: 10m Work Description: lostluck commented on issue #5402: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5402#issuecomment-390016662 R: @herohde @wcn3 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103122) Time Spent: 20m (was: 10m) > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4276) Implement the portable lifted Combiner transforms in Go SDK
[ https://issues.apache.org/jira/browse/BEAM-4276?focusedWorklogId=103121=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103121 ] ASF GitHub Bot logged work on BEAM-4276: Author: ASF GitHub Bot Created on: 17/May/18 21:23 Start Date: 17/May/18 21:23 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #5402: [BEAM-4276] Update generated Go SDK protobuf files. URL: https://github.com/apache/beam/pull/5402 Followed the sdks/go/pkg/beam/model instructions. Necessary pre-req to implementing portability combiner lifting in the Go SDK. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103121) Time Spent: 10m Remaining Estimate: 0h > Implement the portable lifted Combiner transforms in Go SDK > --- > > Key: BEAM-4276 > URL: https://issues.apache.org/jira/browse/BEAM-4276 > Project: Beam > Issue Type: Sub-task > Components: sdk-go >Reporter: Henning Rohde >Assignee: Robert Burke >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)