Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
lostluck merged PR #32425: URL: https://github.com/apache/beam/pull/32425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
lostluck commented on PR #32425: URL: https://github.com/apache/beam/pull/32425#issuecomment-2364756851 Agreed. Please go ahead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
damondouglas commented on PR #32425: URL: https://github.com/apache/beam/pull/32425#issuecomment-2364749648 @lostluck The codecov/patch report shows stuff that didn't seem sensible to write tests for. Should I just go ahead and merge this you thing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
codecov[bot] commented on PR #32425: URL: https://github.com/apache/beam/pull/32425#issuecomment-2364742305 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/32425?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `26.31579%` with `14 lines` in your changes missing coverage. Please review. > Project coverage is 57.32%. Comparing base [(`2a7755b`)](https://app.codecov.io/gh/apache/beam/commit/2a7755b0f2cc4c64e74e5a54c8923f7b19ac836c?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`2690975`)](https://app.codecov.io/gh/apache/beam/commit/2690975d9ae7d85c78f18e099e9b022c409c266a?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 84 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/beam/pull/32425?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...o/pkg/beam/runners/prism/internal/worker/bundle.go](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&filepath=sdks%2Fgo%2Fpkg%2Fbeam%2Frunners%2Fprism%2Finternal%2Fworker%2Fbundle.go&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL3dvcmtlci9idW5kbGUuZ28=) | 0.00% | [9 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [sdks/go/pkg/beam/runners/prism/internal/stage.go](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&filepath=sdks%2Fgo%2Fpkg%2Fbeam%2Frunners%2Fprism%2Finternal%2Fstage.go&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL3N0YWdlLmdv) | 42.85% | [3 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [sdks/go/pkg/beam/core/runtime/graphx/translate.go](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&filepath=sdks%2Fgo%2Fpkg%2Fbeam%2Fcore%2Fruntime%2Fgraphx%2Ftranslate.go&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZ3JhcGh4L3RyYW5zbGF0ZS5nbw==) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #32425 +/- ## - Coverage 57.33% 57.32% -0.01% Complexity 1474 1474 Files 964 965 +1 Lines153301 153467 +166 Branches 1076 1076 + Hits 8789687981 +85 - Misses6320163281 +80 - Partials 2204 2205 +1 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/32425/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [go](https://app.codecov.io/gh/apache/beam/pull/32425/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.65% <26.31%> (+<0.01%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/32425?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please lo
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
lostluck commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1767636349 ## sdks/go/test/integration/primitives/pardo.go: ## @@ -192,3 +196,90 @@ func emitPipelineOptions(_ []byte, emit func(string)) { emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B"))) emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C"))) } + +var CountInvokeBundleFinalizer atomic.Int32 + +const ( + BundleFinalizerStart = 1 + BundleFinalizerProcess = 2 + BundleFinalizerFinish = 4 +) + +// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a +// beam.BundleFinalization in its ProcessElement method. +func ParDoProcessElementBundleFinalizer() *beam.Pipeline { Review Comment: Consider instead having the scope passed into this function *instead* of returning a Pipeline. eg. ``` func ParDoProcessElementBundleFinalizer(s beam.Scope) { imp := beam.Impulse(s) beam.ParDo0(s, &processElemBundleFinalizer{}, imp) } ``` This cuts the boiler plate. That makes it easier to integrated with the Prism Test suites, and thus ensure we get explicit code coverage over the prism you've written code. eg. That's how we do the (increasingly mis-named) unimplemented_test.go file. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go#L74 So you could add the "Finalization" tests to the TestImplemented suite. DO NOT refactor that test file to fix it, that's an orthogonal change that should be made separately. ## sdks/go/test/integration/primitives/pardo.go: ## @@ -192,3 +196,90 @@ func emitPipelineOptions(_ []byte, emit func(string)) { emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B"))) emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C"))) } + +var CountInvokeBundleFinalizer atomic.Int32 + +const ( + BundleFinalizerStart = 1 + BundleFinalizerProcess = 2 + BundleFinalizerFinish = 4 +) + +// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a +// beam.BundleFinalization in its ProcessElement method. +func ParDoProcessElementBundleFinalizer() *beam.Pipeline { Review Comment: Naturally, the same for the other test cases in this file. ## sdks/go/test/integration/primitives/pardo.go: ## @@ -18,10 +18,11 @@ package primitives import ( "flag" "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "sync/atomic" + "time" Review Comment: Please put the standard library imports in a separate group at the top of the imports. For some reason the break was dropped, and the grouping changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
lostluck commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1764226783 ## sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go: ## @@ -206,6 +206,20 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } +func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, error) { + resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_FinalizeBundle{ + FinalizeBundle: &fnpb.FinalizeBundleRequest{ + InstructionId: b.InstID, + }, + }, + }) + if resp.GetError() != "" { + return nil, fmt.Errorf("finalize[%v] error from SDK: %v", b.InstID, resp.GetError()) + } + return resp.GetFinalizeBundle(), nil Review Comment: This is and will likely always be an empty message, and since this is internal code, I'd rather simply not return the empty message at this time, than speculate that we might want to return the message in the future if it gains fields. ## sdks/go/pkg/beam/runners/prism/internal/stage.go: ## @@ -278,6 +280,14 @@ progress: slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput)) } em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals) + if s.finalize { + _, err := b.Finalize(ctx, wk) + if err != nil { + slog.Debug("SDK Error from bundle finalization", "bundle", rb, "error", err.Error()) + panic(err) Review Comment: At present the only reason the tests are failing is due to this panic. I'd make the slog output here be an error instead of debug, since it could be useful to users, and a finalizing failure isn't expected to fail the job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
damondouglas commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1755259045 ## sdks/go/test/integration/primitives/pardo.go: ## @@ -32,6 +33,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) + beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem()) Review Comment: I refactored to using the register funcs. Meanwhile converted PR to draft. I will explore how to filter out the tests for the docker execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
lostluck commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1754885112 ## sdks/go/test/integration/primitives/pardo.go: ## @@ -32,6 +33,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) + beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem()) Review Comment: Seems reasonable. The big trick is we probably need to filter out this test from Docker mode execution generally, and from the Python Portable runner. We don't have a thing for filtering for the docker execution, but we do for portable runner list in integration.go Searching for `--- FAIL` finds the failure in the logs. ''' github.com/apache/beam/sdks/v2/go/test/integration/primitives.TestParDoBundleFinalizer.func1 not found. Register DoFns and functions with the the beam/register package. --- FAIL: TestParDoBundleFinalizer/InFinishBundle (0.04s) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
lostluck commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1754885112 ## sdks/go/test/integration/primitives/pardo.go: ## @@ -32,6 +33,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) + beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem()) Review Comment: Seems reasonable. The big trick is we probably need to filter out this test from Docker mode execution generally, and from the Python Portable runner. We don't have a thing for filtering for the docker execution, but we do for portable runner list in integration.go Searching for `--- FAIL` finds the failure in the logs. ``` github.com/apache/beam/sdks/v2/go/test/integration/primitives.TestParDoBundleFinalizer.func1 not found. Register DoFns and functions with the the beam/register package. --- FAIL: TestParDoBundleFinalizer/InFinishBundle (0.04s) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
damondouglas commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1754676620 ## sdks/go/test/integration/primitives/pardo.go: ## @@ -32,6 +33,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) + beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem()) Review Comment: This might be what is causing the check failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
damondouglas commented on PR #32425: URL: https://github.com/apache/beam/pull/32425#issuecomment-2342440673 R: @lostluck -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]
github-actions[bot] commented on PR #32425: URL: https://github.com/apache/beam/pull/32425#issuecomment-2342441803 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org