Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-21 Thread via GitHub


lostluck merged PR #32425:
URL: https://github.com/apache/beam/pull/32425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-20 Thread via GitHub


lostluck commented on PR #32425:
URL: https://github.com/apache/beam/pull/32425#issuecomment-2364756851

   Agreed. Please go ahead 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-20 Thread via GitHub


damondouglas commented on PR #32425:
URL: https://github.com/apache/beam/pull/32425#issuecomment-2364749648

   @lostluck The codecov/patch report shows stuff that didn't seem sensible to 
write tests for. Should I just go ahead and merge this you thing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-20 Thread via GitHub


codecov[bot] commented on PR #32425:
URL: https://github.com/apache/beam/pull/32425#issuecomment-2364742305

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/32425?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `26.31579%` with `14 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 57.32%. Comparing base 
[(`2a7755b`)](https://app.codecov.io/gh/apache/beam/commit/2a7755b0f2cc4c64e74e5a54c8923f7b19ac836c?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`2690975`)](https://app.codecov.io/gh/apache/beam/commit/2690975d9ae7d85c78f18e099e9b022c409c266a?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 84 commits behind head on master.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/beam/pull/32425?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...o/pkg/beam/runners/prism/internal/worker/bundle.go](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&filepath=sdks%2Fgo%2Fpkg%2Fbeam%2Frunners%2Fprism%2Finternal%2Fworker%2Fbundle.go&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL3dvcmtlci9idW5kbGUuZ28=)
 | 0.00% | [9 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[sdks/go/pkg/beam/runners/prism/internal/stage.go](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&filepath=sdks%2Fgo%2Fpkg%2Fbeam%2Frunners%2Fprism%2Finternal%2Fstage.go&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL3N0YWdlLmdv)
 | 42.85% | [3 Missing and 1 partial :warning: 
](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[sdks/go/pkg/beam/core/runtime/graphx/translate.go](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&filepath=sdks%2Fgo%2Fpkg%2Fbeam%2Fcore%2Fruntime%2Fgraphx%2Ftranslate.go&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZ3JhcGh4L3RyYW5zbGF0ZS5nbw==)
 | 0.00% | [1 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/32425?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #32425  +/-   ##
   
   - Coverage 57.33%   57.32%   -0.01% 
 Complexity 1474 1474  
   
 Files   964  965   +1 
 Lines153301   153467 +166 
 Branches   1076 1076  
   
   + Hits  8789687981  +85 
   - Misses6320163281  +80 
   - Partials   2204 2205   +1 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/32425/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[go](https://app.codecov.io/gh/apache/beam/pull/32425/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `34.65% <26.31%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/32425?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please lo

Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-19 Thread via GitHub


lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1767636349


##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -192,3 +196,90 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
 }
+
+var CountInvokeBundleFinalizer atomic.Int32
+
+const (
+   BundleFinalizerStart   = 1
+   BundleFinalizerProcess = 2
+   BundleFinalizerFinish  = 4
+)
+
+// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a 
beam.ParDo0 that processes a DoFn with a
+// beam.BundleFinalization in its ProcessElement method.
+func ParDoProcessElementBundleFinalizer() *beam.Pipeline {

Review Comment:
   Consider instead having the scope passed into this function *instead* of 
returning a Pipeline.
   
   eg.
   
   ```
   func ParDoProcessElementBundleFinalizer(s beam.Scope)  {
   imp := beam.Impulse(s)
beam.ParDo0(s, &processElemBundleFinalizer{}, imp)
   }
   ```
   
   This cuts the boiler plate.
   
   That makes it easier to integrated with the Prism Test suites, and thus 
ensure we get explicit code coverage over the prism you've written code.
   
   eg. That's how we do the (increasingly mis-named) unimplemented_test.go file.
   
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go#L74
   
   So you could add the "Finalization" tests to the TestImplemented suite.
   DO NOT refactor that test file to fix it, that's an orthogonal change that 
should be made separately.



##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -192,3 +196,90 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
 }
+
+var CountInvokeBundleFinalizer atomic.Int32
+
+const (
+   BundleFinalizerStart   = 1
+   BundleFinalizerProcess = 2
+   BundleFinalizerFinish  = 4
+)
+
+// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a 
beam.ParDo0 that processes a DoFn with a
+// beam.BundleFinalization in its ProcessElement method.
+func ParDoProcessElementBundleFinalizer() *beam.Pipeline {

Review Comment:
   Naturally, the same for the other test cases in this file.



##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -18,10 +18,11 @@ package primitives
 import (
"flag"
"fmt"
-
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+   "sync/atomic"
+   "time"

Review Comment:
   Please put the standard library imports in a separate group at the top of 
the imports. For some reason the break was dropped, and the grouping changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-17 Thread via GitHub


lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1764226783


##
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##
@@ -206,6 +206,20 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
 }
 
+func (b *B) Finalize(ctx context.Context, wk *W) 
(*fnpb.FinalizeBundleResponse, error) {
+   resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
+   Request: &fnpb.InstructionRequest_FinalizeBundle{
+   FinalizeBundle: &fnpb.FinalizeBundleRequest{
+   InstructionId: b.InstID,
+   },
+   },
+   })
+   if resp.GetError() != "" {
+   return nil, fmt.Errorf("finalize[%v] error from SDK: %v", 
b.InstID, resp.GetError())
+   }
+   return resp.GetFinalizeBundle(), nil

Review Comment:
   This is and will likely always be an empty message, and since this is 
internal code, I'd rather simply not return the empty message at this time, 
than speculate that we might want to return the message in the future if it 
gains fields.



##
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##
@@ -278,6 +280,14 @@ progress:
slog.Debug("returned empty residual application", "bundle", rb, 
slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
}
em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, 
residuals)
+   if s.finalize {
+   _, err := b.Finalize(ctx, wk)
+   if err != nil {
+   slog.Debug("SDK Error from bundle finalization", 
"bundle", rb, "error", err.Error())
+   panic(err)

Review Comment:
   At present the only reason the tests are failing is due to this panic.
   
   I'd make the slog output here be an error instead of debug, since it could 
be useful to users, and a finalizing failure isn't expected to fail the job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-11 Thread via GitHub


damondouglas commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1755259045


##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -32,6 +33,9 @@ func init() {
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
+   
beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem())
+   
beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem())
+   beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem())

Review Comment:
   I refactored to using the register funcs. Meanwhile converted PR to draft. I 
will explore how to filter out the tests for the docker execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-11 Thread via GitHub


lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1754885112


##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -32,6 +33,9 @@ func init() {
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
+   
beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem())
+   
beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem())
+   beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem())

Review Comment:
   Seems reasonable. 
   
   The big trick is we probably need to filter out this test from Docker mode 
execution generally, and from the Python Portable runner.
   
   We don't have a thing for filtering for the docker execution, but we do for 
portable runner list in integration.go
   
   Searching for `--- FAIL` finds the failure in the logs.
   
   '''
   
github.com/apache/beam/sdks/v2/go/test/integration/primitives.TestParDoBundleFinalizer.func1
 not found. Register DoFns and functions with the the beam/register package.
   --- FAIL: TestParDoBundleFinalizer/InFinishBundle (0.04s)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-11 Thread via GitHub


lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1754885112


##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -32,6 +33,9 @@ func init() {
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
+   
beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem())
+   
beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem())
+   beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem())

Review Comment:
   Seems reasonable. 
   
   The big trick is we probably need to filter out this test from Docker mode 
execution generally, and from the Python Portable runner.
   
   We don't have a thing for filtering for the docker execution, but we do for 
portable runner list in integration.go
   
   Searching for `--- FAIL` finds the failure in the logs.
   
   ```
   
github.com/apache/beam/sdks/v2/go/test/integration/primitives.TestParDoBundleFinalizer.func1
 not found. Register DoFns and functions with the the beam/register package.
   --- FAIL: TestParDoBundleFinalizer/InFinishBundle (0.04s)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-11 Thread via GitHub


damondouglas commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1754676620


##
sdks/go/test/integration/primitives/pardo.go:
##
@@ -32,6 +33,9 @@ func init() {
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
+   
beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem())
+   
beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem())
+   beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem())

Review Comment:
   This might be what is causing the check failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-10 Thread via GitHub


damondouglas commented on PR #32425:
URL: https://github.com/apache/beam/pull/32425#issuecomment-2342440673

   R: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Prism] Support BundleFinalization DoFn parameter [beam]

2024-09-10 Thread via GitHub


github-actions[bot] commented on PR #32425:
URL: https://github.com/apache/beam/pull/32425#issuecomment-2342441803

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control. If you'd like to restart, comment 
`assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org