This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ca9e306a7 [prism] Add dynamic channel & sub element splits. (#26484)
50ca9e306a7 is described below

commit 50ca9e306a7f788486724936e1b68451eff41aff
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Tue May 2 12:14:53 2023 -0700

    [prism] Add dynamic channel & sub element splits. (#26484)
    
    * Add channel split separations.
    
    * Use channel leases for parallel processing.
    
    * move data wait to chan + atomic
    
    * Progress and split on execute goroutine.
    
    * Convert metrics tests to short ids.
    
    * Add Provision handler w/ capabilities.
    
    * Add artifact validation to run_rc_validation (#26407)
    
    * Add artifact validation to run_rc_validation
    
    * config file for artifacts
    
    * use java bom contents for validation
    
    * add pip_pre = True
    
    * Update tox.ini
    
    * Remove out of date experimental warning on SDFs (#26450)
    
    * [ToBF] Refinement 26.04.23 (#26428)
    
    * content tree rebuilds on sdk change
    
    * expandable parent node widget
    
    * comment fix
    
    * comment fix
    
    ---------
    
    Co-authored-by: darkhan.nausharipov <darkhan.naushari...@kzn.akvelon.com>
    
    * Allow implicit flattening for yaml inputs. (#26423)
    
    * Add field annotations for high-priority Syndeo schema transforms (#26384)
    
    add changes
    
    * [Go SDK] Timers with new datalayer (#26101)
    
    * added timer package
    
    * add timer changes and merged with rebo's pr
    
    * timer fired in stateful
    
    * error setting new timer in ontimer
    
    * looping timers work
    
    * send fv instead of bytes
    
    * changes to coder/pardo
    
    * works for all cases, only cleanup left
    
    * remove comments and validate onTimer
    
    * generic coder for user key
    
    * fixes coder end to end
    
    * remove logs
    
    * add unit test and refactor
    
    * add docs
    
    * new example
    
    * fix static lint
    
    * support emitters
    
    * allow input col of CoGBK as well
    
    * unit tests, periodic impulse, minor refactor
    
    * update PipelineTimer interface, minor refactor, doc comment for example
    
    * add warn message
    
    * single edge timer coder, rm kv coder check, cache encoder,decoder
    
    * Update chromedriver-binary requirement in /sdks/python
    
    Updates the requirements on 
[chromedriver-binary](https://github.com/danielkaiser/python-chromedriver-binary)
 to permit the latest version.
    - [Release 
notes](https://github.com/danielkaiser/python-chromedriver-binary/releases)
    - 
[Commits](https://github.com/danielkaiser/python-chromedriver-binary/compare/v100.0.4896.20.0...v113.0.5672.24.0)
    
    ---
    updated-dependencies:
    - dependency-name: chromedriver-binary
      dependency-type: direct:development
    ...
    
    Signed-off-by: dependabot[bot] <supp...@github.com>
    
    * Automation: Tour of Beam infrastructure deployment (#25793)
    
    * Added Terraform scripts for TOB infra
    
    * ToB Frontend related updates
    
    * Update settings.gradle.kts
    
    * Deleted redundant file and minor README change
    
    * Addressing comments in the PR
    
    * Added newline at the end of variables.tf file
    
    * Update README.md
    
    * Updates related to Tour of Beam infrastructure
    
    * Update locals.tf
    
    * Output.tf updates
    
    * Update output.tf
    
    * Updates
    
    * Update main.tf
    
    * Updates to cloudfunctions_bucket variable
    
    * service_account_id changes
    
    * Update main.tf
    
    * Update README.md
    
    * Update README.md
    
    * Update README.md
    
    * Update README.md
    
    * Update README.md
    
    * Bulk update of terraform scripts
    
    * Update README.md
    
    * Update README.md
    
    * Datastore_namespace updates
    
    * Update README.md
    
    * Update README.md
    
    * Update README.md
    
    * Update README.md
    
    * Update main.tf
    
    * Update README.md
    
    * Update README.md
    
    * Update README.md
    
    * Some minor TF updates
    
    * Update README.md
    
    * Modify batch IT to use count instead of hash (#26327)
    
    * Modify batch IT to use count instead of hash
    
    * remove unused varaiable
    
    * run spotless, update pipeline state checking
    
    * Update timeout to 45 minutes
    
    * revert timeout, add additional counter to try and pinpoint missing records
    
    * add a log to notify ranges used when workers restart
    
    * change counts from metrics to combiners
    
    * add a window to streaming test
    
    * move the passert to the correct place
    
    * Remove extra counter, apply spotless
    
    * add additional metric to KafkaWriter
    
    * Remove debugging metrics
    
    * verify pipeline is not failed
    
    * remove extra newline
    
    * Revert "Modify batch IT to use count instead of hash (#26327)" (#26466)
    
    This reverts commit 9903b2f1e937beebdbb73be7a4e2eda760c0a169.
    
    * Bump Java Dataflow container images (#26459)
    
    * keep retrying mass_comment until it has started all jobs (#26457)
    
    * keep retrying mass_comment until it has started all jobs
    
    * fix lookups
    
    * Add driverJars parameter to JdbcIO. (#25824)
    
    This change allows users to use driver jars saved in GCS. With this change, 
Dataflow templates will be able to migrate to JdbcIO instead of DynamicJdbcIO.
    
    * [Roll Fwd PR] Rename _namespace to _get_display_data_namespace"" (#26470)
    
    * Update parquetio and textio to work with -beam_strict (#26469)
    
    * use wheel sdk location for PostCommit_Py_Examples (#26473)
    
    * Move back the timeout of Python PostCommit to 4h
    
    * Minor fix on Python PostCommit description strings
    
    * Add recent postcommits to jenkins README
    
    * More user-friendly providers.
    
    * Add yaml preprocessing phases.
    
    * Add flexible windowing syntax to yaml.
    
    * Implement flatten in terms of preprocessor phase.
    
    This composes better with windowing.
    
    * Reword SQL note.
    
    * Make linter happy.
    
    * Survive errors in size estimation in MongoDbIO
    
    * Fix jdbc xlang schema type mismatch (#26480)
    
    * Fix jdbc xlang schema type mismatch
    
    * Also fix fetch_size type mismatch
    
    * Add new fields in the end
    
    * [Python] Add saved_weights example to tf notebook (#26472)
    
    * add saved_weights example to tf notebook
    
    * add description
    
    * updated text blocks
    
    * Update examples/notebooks/beam-ml/run_inference_tensorflow.ipynb
    
    Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com>
    
    * Update examples/notebooks/beam-ml/run_inference_tensorflow.ipynb
    
    Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com>
    
    ---------
    
    Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com>
    
    * Revert "Revert "Modify batch IT to use count instead of hash (#26327)" 
(#26466)" (#26467)
    
    This reverts commit 5c676687a1faecf7166481e684792d26f7626289.
    
    * Bump github.com/tetratelabs/wazero from 1.0.3 to 1.1.0 in /sdks (#26486)
    
    Bumps 
[github.com/tetratelabs/wazero](https://github.com/tetratelabs/wazero) from 
1.0.3 to 1.1.0.
    - [Release notes](https://github.com/tetratelabs/wazero/releases)
    - [Commits](https://github.com/tetratelabs/wazero/compare/v1.0.3...v1.1.0)
    
    ---
    updated-dependencies:
    - dependency-name: github.com/tetratelabs/wazero
      dependency-type: direct:production
      update-type: version-update:semver-minor
    ...
    
    Signed-off-by: dependabot[bot] <supp...@github.com>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    
    * tour of beam integration tests (#25925)
    
    * Integration test to load the default example of the default SDK and 
change the example (#24730) (#24729)
    
    * Fix formatting and README (#24730)
    
    * Support collection v1.17.0 (#24730)
    
    * LoadingIndicator on chaning examples, remove duplicating licenses (#24730)
    
    * Add a missing license header (#24730)
    
    * Integration test for changing SDK and running code (#24779) (#382)
    
    * Integration test for changing SDK and running code (#24779)
    
    * Rename an integration test (#24779)
    
    * Use enum to switch SDK in integration test (#24779)
    
    * Find SDK in a dropdown by key (#24779)
    
    * Add a TODO (#24779)
    
    * Fix exports (#24779)
    
    * Issue24779 integration changing sdk from 24370 (#387)
    
    * Integration test for changing SDK and running code (#24779)
    
    * Rename an integration test (#24779)
    
    * Use enum to switch SDK in integration test (#24779)
    
    * Find SDK in a dropdown by key (#24779)
    
    * Add a TODO (#24779)
    
    * Fix exports (#24779)
    
    * Integration tests miscellaneous UI (#383)
    
    * miscellaneous ui integration tests
    
    * reverted pubspec.lock
    
    * gradle tasks ordered alhpabetically
    
    * integration tests refactoring
    
    * clean code
    
    * integration tests miscellaneous ui fix pr
    
    * rename method
    
    * added layout adaptivity
    
    * A minor cleanup (#24779)
    
    Co-authored-by: Dmitry Repin <mr.mal...@gmail.com>
    
    * integration tests run and editing
    
    * example selector test
    
    * minor fixes
    
    * rat
    
    * fix pr
    
    * minor
    
    * minor
    
    * rat
    
    * integration test finder written
    
    * integration test minor fixes
    
    * minor fixes
    
    * removed comment
    
    * minor fixes
    
    * playground integration tests minor fixes
    
    * integration test pumpAnSettleNoException
    
    * integration test shortcut refactor
    
    * integration test another changing shortcuts running
    
    * upgrade to flutter 3.7.1
    
    * workaround comment
    
    * playground frontend updated major versions
    
    * issues 25329 25331 25336
    
    * 25329 extract connectivity extension to separate file
    
    * Upgrade Flutter to 3.7.3 in integration tests (#24730)
    
    * Fix integration test (#24730)
    
    * fix cors issue and added mouse scroll to tags
    
    * Upgrade Flutter in Dockerfile (#24720)
    
    * sorting moved to model
    
    * sorting moved to model
    
    * sorting moved to model
    
    * bugs fix
    
    * issue 25278
    
    * fix pr
    
    * quites fix in en.yaml
    
    * Fix not loading default example (#25528)
    
    * fix compile error
    
    * Refactor output tabs, test embedded playground (#25136) (#439)
    
    * Refactor output tabs, test embedded playground (#25136)
    
    * Clean up (#25136)
    
    * Change example paths to IDs in integration tests
    
    * issue25640 tob ci
    
    * fix tob ci
    
    * rename ci process
    
    * test add new line to main
    
    * test add new line to main
    
    * commented unit test run
    
    * issue25640 changed server path
    
    * issue25640 tests on welcome page
    
    * deleted config.g.dart
    
    * issue25640 pr fixes
    
    * Update .github/workflows/tour_of_beam_frontend_test.yml
    
    Co-authored-by: alexeyinkin <l...@inkin.ru>
    
    * Update 
learning/tour-of-beam/frontend/integration_test/welcome_page_test.dart
    
    Co-authored-by: alexeyinkin <l...@inkin.ru>
    
    * Improve tests (#25640)
    
    * issue25640 tour page tests
    
    * pr fix
    
    * removed import
    
    * pr fix
    
    * fix test
    
    * 25640 fixed pubspec.lock
    
    * issue25640 fix readme
    
    * updated readme
    
    * issue25640 fixed after master merge
    
    * issue25483 ToB pipeline options
    
    * removed unnecesary variable
    
    * pr fix
    
    * Update learning/tour-of-beam/frontend/assets/translations/en.yaml
    
    Co-authored-by: alexeyinkin <l...@inkin.ru>
    
    * playground hides when snippet does not exists
    
    * pipeline options extracted to playground components
    
    * issue25483 pipeline options
    
    * added errors handling, fix pr
    
    * refactoring
    
    * Revert "refactoring"
    
    This reverts commit 1540961fe7b66673f70f56632947cdd988c3a0b7.
    
    * removed unnecessary constants
    
    * playground controller in tour notifier becomes nullable
    
    * playground controller returned to non nullable in tour notifier
    
    * playground controller actions
    
    * removed unnecessary code
    
    * tob scaffold wrapped with animated builder
    
    * minor fixes
    
    * partially fixed tests
    
    * Upgrade flutter_code_editor to v0.2.19 (#25640)
    
    * Replace output SelectableText with a CodeField instance (#25640)
    
    * Trigger ToB integration tests (#25640)
    
    * Clean up (#25640)
    
    * Enable manual workflow runs for Playground and ToB integration tests 
(#25640)
    
    ---------
    
    Co-authored-by: Alexey Inkin <alexey.in...@akvelon.com>
    Co-authored-by: alexeyinkin <l...@inkin.ru>
    
    * Eliminate nullness errors from MongoDbIO
    
    * Move provisioned options outside of harness.Main (#26476)
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
    
    * Add SDF adjacent element test
    
    * Consolidate residual processing
    
    * fix race condition on split boundaries
    
    * deflake fixed window combine tests
    
    * Add comments to job functions.
    
    * Make int64check per window.
    
    * Update sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
    
    Co-authored-by: Ritesh Ghorse <riteshgho...@gmail.com>
    
    ---------
    
    Signed-off-by: dependabot[bot] <supp...@github.com>
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
    Co-authored-by: Danny McCormick <dannymccorm...@google.com>
    Co-authored-by: Anand Inguva <anandinguv...@gmail.com>
    Co-authored-by: Anand Inguva <34158215+ananding...@users.noreply.github.com>
    Co-authored-by: Darkhan Nausharipov 
<31556582+naushari...@users.noreply.github.com>
    Co-authored-by: darkhan.nausharipov <darkhan.naushari...@kzn.akvelon.com>
    Co-authored-by: Robert Bradshaw <rober...@gmail.com>
    Co-authored-by: Andrei Gurau <andreigu...@google.com>
    Co-authored-by: Ritesh Ghorse <riteshgho...@gmail.com>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: ruslan-ikhsan 
<114978215+ruslan-ikh...@users.noreply.github.com>
    Co-authored-by: johnjcasey <95318300+johnjca...@users.noreply.github.com>
    Co-authored-by: Yi Hu <ya...@google.com>
    Co-authored-by: Pranav Bhandari <bhandari.prana...@gmail.com>
    Co-authored-by: Svetak Sundhar <svetaksund...@google.com>
    Co-authored-by: Jeremy Edwards <jerem...@gmail.com>
    Co-authored-by: Kenneth Knowles <k...@google.com>
    Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com>
    Co-authored-by: Dmitry Repin <mr.mal...@gmail.com>
    Co-authored-by: Alexey Inkin <alexey.in...@akvelon.com>
    Co-authored-by: alexeyinkin <l...@inkin.ru>
---
 .../prism/internal/engine/elementmanager.go        |  86 ++++++++++++----
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  11 +-
 .../beam/runners/prism/internal/execute_test.go    |   3 +
 .../beam/runners/prism/internal/jobservices/job.go |  21 +++-
 .../runners/prism/internal/jobservices/metrics.go  |  88 ++++++++++++----
 .../prism/internal/jobservices/metrics_test.go     |  73 ++++++++-----
 .../beam/runners/prism/internal/separate_test.go   |  79 ++++++++++++--
 sdks/go/pkg/beam/runners/prism/internal/stage.go   |  82 +++++++++++++--
 .../pkg/beam/runners/prism/internal/testdofns.go   |  24 +++--
 .../beam/runners/prism/internal/testdofns_test.go  |   3 +-
 .../pkg/beam/runners/prism/internal/urns/urns.go   |  21 ++--
 .../beam/runners/prism/internal/worker/bundle.go   |  79 +++++++++++---
 .../runners/prism/internal/worker/bundle_test.go   |   2 +-
 .../beam/runners/prism/internal/worker/worker.go   | 114 +++++++++++++++++++--
 .../runners/prism/internal/worker/worker_test.go   |   6 +-
 15 files changed, 552 insertions(+), 140 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index aeabc81b812..89fececea10 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -276,6 +276,34 @@ func (em *ElementManager) InputForBundle(rb RunBundle, 
info PColInfo) [][]byte {
        return es.ToData(info)
 }
 
+// reElementResiduals extracts the windowed value header from residual bytes, 
and explodes them
+// back out to their windows.
+func reElementResiduals(residuals [][]byte, inputInfo PColInfo, rb RunBundle) 
[]element {
+       var unprocessedElements []element
+       for _, residual := range residuals {
+               buf := bytes.NewBuffer(residual)
+               ws, et, pn, err := 
exec.DecodeWindowedValueHeader(inputInfo.WDec, buf)
+               if err != nil {
+                       if err == io.EOF {
+                               break
+                       }
+                       slog.Error("reElementResiduals: error decoding residual 
header", err, "bundle", rb)
+                       panic("error decoding residual header")
+               }
+
+               for _, w := range ws {
+                       unprocessedElements = append(unprocessedElements,
+                               element{
+                                       window:    w,
+                                       timestamp: et,
+                                       pane:      pn,
+                                       elmBytes:  buf.Bytes(),
+                               })
+               }
+       }
+       return unprocessedElements
+}
+
 // PersistBundle uses the tentative bundle output to update the watermarks for 
the stage.
 // Each stage has two monotonically increasing watermarks, the input 
watermark, and the output
 // watermark.
@@ -330,28 +358,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
        }
 
        // Return unprocessed to this stage's pending
-       var unprocessedElements []element
-       for _, residual := range residuals {
-               buf := bytes.NewBuffer(residual)
-               ws, et, pn, err := 
exec.DecodeWindowedValueHeader(inputInfo.WDec, buf)
-               if err != nil {
-                       if err == io.EOF {
-                               break
-                       }
-                       slog.Error("PersistBundle: error decoding residual 
header", err, "bundle", rb)
-                       panic("error decoding residual header")
-               }
-
-               for _, w := range ws {
-                       unprocessedElements = append(unprocessedElements,
-                               element{
-                                       window:    w,
-                                       timestamp: et,
-                                       pane:      pn,
-                                       elmBytes:  buf.Bytes(),
-                               })
-               }
-       }
+       unprocessedElements := reElementResiduals(residuals, inputInfo, rb)
        // Add unprocessed back to the pending stack.
        if len(unprocessedElements) > 0 {
                em.pendingElements.Add(len(unprocessedElements))
@@ -379,6 +386,21 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
        em.addRefreshAndClearBundle(stage.ID, rb.BundleID)
 }
 
+// ReturnResiduals is called after a successful split, so the remaining work
+// can be re-assigned to a new bundle.
+func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, 
inputInfo PColInfo, residuals [][]byte) {
+       stage := em.stages[rb.StageID]
+
+       stage.splitBundle(rb, firstRsIndex)
+       unprocessedElements := reElementResiduals(residuals, inputInfo, rb)
+       if len(unprocessedElements) > 0 {
+               slog.Debug("ReturnResiduals: unprocessed elements", "bundle", 
rb, "count", len(unprocessedElements))
+               em.pendingElements.Add(len(unprocessedElements))
+               stage.AddPending(unprocessedElements)
+       }
+       em.addRefreshes(singleSet(rb.StageID))
+}
+
 func (em *ElementManager) addRefreshes(stages set[string]) {
        em.refreshCond.L.Lock()
        defer em.refreshCond.L.Unlock()
@@ -439,6 +461,10 @@ func (s set[K]) merge(o set[K]) {
        }
 }
 
+func singleSet[T comparable](v T) set[T] {
+       return set[T]{v: struct{}{}}
+}
+
 // stageState is the internal watermark and input tracking for a stage.
 type stageState struct {
        ID        string
@@ -569,6 +595,22 @@ func (ss *stageState) startBundle(watermark mtime.Time, 
genBundID func() string)
        return bundID, true
 }
 
+func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) {
+       ss.mu.Lock()
+       defer ss.mu.Unlock()
+
+       es := ss.inprogress[rb.BundleID]
+       slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), 
"res", firstResidual)
+
+       prim := es.es[:firstResidual]
+       res := es.es[firstResidual:]
+
+       es.es = prim
+       ss.pending = append(ss.pending, res...)
+       heap.Init(&ss.pending)
+       ss.inprogress[rb.BundleID] = es
+}
+
 // minimumPendingTimestamp returns the minimum pending timestamp from all 
pending elements,
 // including in progress ones.
 //
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 2329a43d214..13c8b2b127c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -102,7 +102,6 @@ func externalEnvironment(ctx context.Context, ep 
*pipepb.ExternalPayload, wk *wo
        endpoint := &pipepb.ApiServiceDescriptor{
                Url: wk.Endpoint(),
        }
-
        pool.StartWorker(ctx, &fnpb.StartWorkerRequest{
                WorkerId:          wk.ID,
                ControlEndpoint:   endpoint,
@@ -274,10 +273,16 @@ func executePipeline(ctx context.Context, wk *worker.W, j 
*jobservices.Job) {
                em.Impulse(id)
        }
 
+       // Use a channel to limit max parallelism for the pipeline.
+       maxParallelism := make(chan struct{}, 8)
        // Execute stages here
        for rb := range em.Bundles(ctx, wk.NextInst) {
-               s := stages[rb.StageID]
-               s.Execute(j, wk, comps, em, rb)
+               maxParallelism <- struct{}{}
+               go func(rb engine.RunBundle) {
+                       defer func() { <-maxParallelism }()
+                       s := stages[rb.StageID]
+                       s.Execute(j, wk, comps, em, rb)
+               }(rb)
        }
        slog.Info("pipeline done!", slog.String("job", j.String()))
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
index de7247486bb..aac5d97abfb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
@@ -128,6 +128,9 @@ func TestRunner_Pipelines(t *testing.T) {
                                qr := pr.Metrics().Query(func(sr 
metrics.SingleResult) bool {
                                        return sr.Name() == "sunk"
                                })
+                               if len(qr.Counters()) == 0 {
+                                       t.Fatal("no metrics, expected one.")
+                               }
                                if got, want := qr.Counters()[0].Committed, 
int64(73); got != want {
                                        t.Errorf("pr.Metrics.Query(Name = 
\"sunk\")).Committed = %v, want %v", got, want)
                                }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 95b1ce12af9..5b8e786ac6f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -39,7 +39,7 @@ import (
        "google.golang.org/protobuf/types/known/structpb"
 )
 
-var capabilities = map[string]struct{}{
+var supportedRequirements = map[string]struct{}{
        urns.RequirementSplittableDoFn: {},
 }
 
@@ -48,13 +48,13 @@ var capabilities = map[string]struct{}{
 func isSupported(requirements []string) error {
        var unsupported []string
        for _, req := range requirements {
-               if _, ok := capabilities[req]; !ok {
+               if _, ok := supportedRequirements[req]; !ok {
                        unsupported = append(unsupported, req)
                }
        }
        if len(unsupported) > 0 {
                sort.Strings(unsupported)
-               return fmt.Errorf("local runner doesn't support the following 
required features: %v", strings.Join(unsupported, ","))
+               return fmt.Errorf("prism runner doesn't support the following 
required features: %v", strings.Join(unsupported, ","))
        }
        return nil
 }
@@ -81,8 +81,19 @@ type Job struct {
        metrics metricsStore
 }
 
-func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
-       j.metrics.ContributeMetrics(payloads)
+// ContributeTentativeMetrics returns the datachannel read index, and any 
unknown monitoring short ids.
+func (j *Job) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
+       return j.metrics.ContributeTentativeMetrics(payloads)
+}
+
+// ContributeFinalMetrics returns any unknown monitoring short ids.
+func (j *Job) ContributeFinalMetrics(payloads *fnpb.ProcessBundleResponse) 
[]string {
+       return j.metrics.ContributeFinalMetrics(payloads)
+}
+
+// AddMetricShortIDs populates metric short IDs with their metadata.
+func (j *Job) AddMetricShortIDs(ids *fnpb.MonitoringInfosMetadataResponse) {
+       j.metrics.AddShortIDs(ids)
 }
 
 func (j *Job) String() string {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index 39936bae72f..6db16191de9 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -449,46 +449,92 @@ func (k apiRequestLatenciesKey) Labels() 
map[string]string {
 
 type metricsStore struct {
        mu     sync.Mutex
-       accums map[metricKey]metricAccumulator
+       accums [2]map[metricKey]metricAccumulator
+
+       shortIDsToKeys      map[string]metricKey
+       unprocessedPayloads [2]map[string][]byte
 }
 
-func (m *metricsStore) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) 
{
+func (m *metricsStore) AddShortIDs(resp *fnpb.MonitoringInfosMetadataResponse) 
{
        m.mu.Lock()
        defer m.mu.Unlock()
-       if m.accums == nil {
-               m.accums = map[metricKey]metricAccumulator{}
+
+       if m.shortIDsToKeys == nil {
+               m.shortIDsToKeys = map[string]metricKey{}
        }
-       // Old and busted.
-       mons := payloads.GetMonitoringInfos()
-       for _, mon := range mons {
-               urn := mon.GetUrn()
+
+       mis := resp.GetMonitoringInfo()
+       for short, mi := range mis {
+               urn := mi.GetUrn()
                ops, ok := mUrn2Ops[urn]
                if !ok {
                        slog.Debug("unknown metrics urn", slog.String("urn", 
urn))
                        continue
                }
-               key := ops.keyFn(urn, mon.GetLabels())
-               a, ok := m.accums[key]
+               key := ops.keyFn(urn, mi.GetLabels())
+               m.shortIDsToKeys[short] = key
+       }
+       for d, payloads := range m.unprocessedPayloads {
+               m.contributeMetrics(durability(d), payloads)
+               m.unprocessedPayloads[d] = nil
+       }
+}
+
+func (m *metricsStore) contributeMetrics(d durability, mdata 
map[string][]byte) (int64, []string) {
+       readIndex := int64(-1)
+       if m.accums[d] == nil {
+               m.accums[d] = map[metricKey]metricAccumulator{}
+       }
+       if m.unprocessedPayloads[d] == nil {
+               m.unprocessedPayloads[d] = map[string][]byte{}
+       }
+       accums := m.accums[d]
+       var missingShortIDs []string
+       for short, payload := range mdata {
+               key, ok := m.shortIDsToKeys[short]
                if !ok {
+                       missingShortIDs = append(missingShortIDs, short)
+                       m.unprocessedPayloads[d][short] = payload
+                       continue
+               }
+               a, ok := accums[key]
+               if !ok || d == tentative {
+                       ops, ok := mUrn2Ops[key.Urn()]
+                       if !ok {
+                               slog.Debug("unknown metrics urn", 
slog.String("urn", key.Urn()))
+                               continue
+                       }
                        a = ops.newAccum()
                }
-               if err := a.accumulate(mon.GetPayload()); err != nil {
-                       panic(fmt.Sprintf("error decoding metrics %v: 
%+v\n\t%+v", urn, key, a))
+               if err := a.accumulate(payload); err != nil {
+                       panic(fmt.Sprintf("error decoding metrics %v: 
%+v\n\t%+v", key.Urn(), key, a))
+               }
+               accums[key] = a
+               if key.Urn() == "beam:metric:data_channel:read_index:v1" {
+                       readIndex = a.(*sumInt64).sum
                }
-               m.accums[key] = a
        }
-       // New hotness.
-       mdata := payloads.GetMonitoringData()
-       _ = mdata
+       return readIndex, missingShortIDs
+}
+
+func (m *metricsStore) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       return m.contributeMetrics(tentative, payloads.GetMonitoringData())
+}
+
+func (m *metricsStore) ContributeFinalMetrics(payloads 
*fnpb.ProcessBundleResponse) []string {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       _, unknownIDs := m.contributeMetrics(committed, 
payloads.GetMonitoringData())
+       return unknownIDs
 }
 
 func (m *metricsStore) Results(d durability) []*pipepb.MonitoringInfo {
-       // We don't gather tentative metrics yet.
-       if d == tentative {
-               return nil
-       }
+       m.mu.Lock()
+       defer m.mu.Unlock()
        infos := make([]*pipepb.MonitoringInfo, 0, len(m.accums))
-       for key, accum := range m.accums {
+       for key, accum := range m.accums[d] {
                infos = append(infos, accum.toProto(key))
        }
        return infos
diff --git 
a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go
index e0346731f30..bb3ef243ce6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go
@@ -33,7 +33,7 @@ import (
 var metSpecs = (pipepb.MonitoringInfoSpecs_Enum)(0).Descriptor().Values()
 
 // makeInfo generates dummy Monitoring infos from a spec.
-func makeInfo(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) 
*pipepb.MonitoringInfo {
+func makeInfo(enum pipepb.MonitoringInfoSpecs_Enum) *pipepb.MonitoringInfo {
        spec := 
proto.GetExtension(metSpecs.ByNumber(protoreflect.EnumNumber(enum)).Options(), 
pipepb.E_MonitoringInfoSpec).(*pipepb.MonitoringInfoSpec)
 
        labels := map[string]string{}
@@ -41,13 +41,18 @@ func makeInfo(enum pipepb.MonitoringInfoSpecs_Enum, payload 
[]byte) *pipepb.Moni
                labels[l] = l
        }
        return &pipepb.MonitoringInfo{
-               Urn:     spec.GetUrn(),
-               Type:    spec.GetType(),
-               Labels:  labels,
-               Payload: payload,
+               Urn:    spec.GetUrn(),
+               Type:   spec.GetType(),
+               Labels: labels,
        }
 }
 
+func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) 
*pipepb.MonitoringInfo {
+       info := makeInfo(enum)
+       info.Payload = payload
+       return info
+}
+
 // This test validates that multiple contributions are correctly summed up and 
accumulated.
 func Test_metricsStore_ContributeMetrics(t *testing.T) {
 
@@ -70,45 +75,58 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
                name string
 
                // TODO convert input to non-legacy metrics once we support, 
and then delete these.
-               input [][]*pipepb.MonitoringInfo
+               input    []map[string][]byte
+               shortIDs map[string]*pipepb.MonitoringInfo
 
                want []*pipepb.MonitoringInfo
        }{
                {
                        name: "int64Sum",
-                       input: [][]*pipepb.MonitoringInfo{
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{3})},
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{5})},
+                       input: []map[string][]byte{
+                               {"a": []byte{3}},
+                               {"a": []byte{5}},
+                       },
+                       shortIDs: map[string]*pipepb.MonitoringInfo{
+                               "a": 
makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64),
                        },
                        want: []*pipepb.MonitoringInfo{
-                               
makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{8}),
+                               
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{8}),
                        },
                }, {
                        name: "float64Sum",
-                       input: [][]*pipepb.MonitoringInfo{
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(3.14))},
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(1.06))},
+                       input: []map[string][]byte{
+                               {"a": doubleBytes(3.14)},
+                               {"a": doubleBytes(1.06)},
+                       },
+                       shortIDs: map[string]*pipepb.MonitoringInfo{
+                               "a": 
makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE),
                        },
                        want: []*pipepb.MonitoringInfo{
-                               
makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(4.20)),
+                               
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(4.20)),
                        },
                }, {
                        name: "progress",
-                       input: [][]*pipepb.MonitoringInfo{
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(1, 2.2, 78))},
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(0, 7.8, 22))},
+                       input: []map[string][]byte{
+                               {"a": progress(1, 2.2, 78)},
+                               {"a": progress(0, 7.8, 22)},
+                       },
+                       shortIDs: map[string]*pipepb.MonitoringInfo{
+                               "a": 
makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING),
                        },
                        want: []*pipepb.MonitoringInfo{
-                               
makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(0, 7.8, 22)),
+                               
makeInfoWBytes(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(0, 7.8, 22)),
                        },
                }, {
                        name: "int64Distribution",
-                       input: [][]*pipepb.MonitoringInfo{
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{1, 2, 2, 
2})},
-                               
{makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{3, 17, 5, 
7})},
+                       input: []map[string][]byte{
+                               {"a": []byte{1, 2, 2, 2}},
+                               {"a": []byte{3, 17, 5, 7}},
+                       },
+                       shortIDs: map[string]*pipepb.MonitoringInfo{
+                               "a": 
makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64),
                        },
                        want: []*pipepb.MonitoringInfo{
-                               
makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{4, 19, 2, 
7}),
+                               
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{4, 
19, 2, 7}),
                        },
                },
        }
@@ -117,11 +135,14 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
                t.Run(test.name, func(t *testing.T) {
                        ms := metricsStore{}
 
+                       ms.AddShortIDs(&fnpb.MonitoringInfosMetadataResponse{
+                               MonitoringInfo: test.shortIDs,
+                       })
+
                        for _, payload := range test.input {
-                               resp := &fnpb.ProcessBundleResponse{
-                                       MonitoringInfos: payload,
-                               }
-                               ms.ContributeMetrics(resp)
+                               
ms.ContributeFinalMetrics(&fnpb.ProcessBundleResponse{
+                                       MonitoringData: payload,
+                               })
                        }
 
                        got := ms.Results(committed)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go
index 2e96651bfe9..a234d6470a4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go
@@ -93,7 +93,7 @@ func TestSeparation(t *testing.T) {
                                out := beam.ParDo(s, &sepHarnessSdfStream{
                                        Base: sepHarnessBase{
                                                WatcherID:         
ws.newWatcher(3),
-                                               Sleep:             time.Second,
+                                               Sleep:             10 * 
time.Millisecond,
                                                IsSentinelEncoded: 
beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
                                                LocalService:      
ws.serviceAddress,
                                        },
@@ -107,7 +107,7 @@ func TestSeparation(t *testing.T) {
                                count := 10
                                imp := beam.Impulse(s)
                                out := beam.ParDo(s, &singleStepSdfStream{
-                                       Sleep:    time.Second,
+                                       Sleep:    10 * time.Millisecond,
                                        RestSize: int64(count),
                                }, imp)
                                passert.Count(s, out, "global stepped num 
ints", count)
@@ -121,7 +121,7 @@ func TestSeparation(t *testing.T) {
                                count := int(elms / mod)
                                imp := beam.Impulse(s)
                                out := beam.ParDo(s, &eventtimeSDFStream{
-                                       Sleep:    time.Second,
+                                       Sleep:    10 * time.Millisecond,
                                        RestSize: int64(elms),
                                        Mod:      int64(mod),
                                        Fixed:    1,
@@ -135,12 +135,42 @@ func TestSeparation(t *testing.T) {
                                gsum := beam.WindowInto(s, 
window.NewGlobalWindows(), sum)
                                passert.Count(s, gsum, "total sums", count)
                        },
+               }, {
+                       name: "ChannelSplit",
+                       pipeline: func(s beam.Scope) {
+                               count := 10
+                               imp := beam.Impulse(s)
+                               ints := beam.ParDo(s, emitTenFn, imp)
+                               out := beam.ParDo(s, &sepHarness{
+                                       Base: sepHarnessBase{
+                                               WatcherID:         
ws.newWatcher(3),
+                                               Sleep:             100 * 
time.Millisecond,
+                                               IsSentinelEncoded: 
beam.EncodedFunc{Fn: reflectx.MakeFunc(threeSentinel)},
+                                               LocalService:      
ws.serviceAddress,
+                                       },
+                               }, ints)
+                               out = beam.ParDo(s, toInt, out)
+                               passert.Sum(s, out, "sum ints", count, 55)
+                       },
+               }, {
+                       name: "SDFSplit_adjacent_positions",
+                       pipeline: func(s beam.Scope) {
+                               count := 10
+                               imp := beam.Impulse(s)
+                               out := beam.ParDo(s, &sepHarnessSdf{
+                                       Base: sepHarnessBase{
+                                               WatcherID:         
ws.newWatcher(3),
+                                               Sleep:             100 * 
time.Millisecond,
+                                               IsSentinelEncoded: 
beam.EncodedFunc{Fn: reflectx.MakeFunc(closeSentinel)},
+                                               LocalService:      
ws.serviceAddress,
+                                       },
+                                       RestSize: int64(count),
+                               }, imp)
+                               passert.Count(s, out, "total elements", count)
+                       },
                },
        }
 
-       // TODO: Channel Splits
-       // TODO: SubElement/dynamic splits.
-
        for _, test := range tests {
                t.Run(test.name, func(t *testing.T) {
                        p, s := beam.NewPipelineWithRoot()
@@ -156,8 +186,27 @@ func TestSeparation(t *testing.T) {
        }
 }
 
+func init() {
+       register.Function2x0(emitTenFn)
+       register.Function2x0(toInt)
+       register.Emitter1[int64]()
+       register.Emitter1[int]()
+}
+
+func emitTenFn(_ []byte, emit func(int64)) {
+       for i := int64(1); i <= 10; i++ {
+               emit(i)
+       }
+}
+
+func toInt(v int64, emit func(int)) {
+       emit(int(v))
+}
+
 func init() {
        register.Function1x1(allSentinel)
+       register.Function1x1(threeSentinel)
+       register.Function2x1(closeSentinel)
 }
 
 // allSentinel indicates that all elements are sentinels.
@@ -165,6 +214,22 @@ func allSentinel(v beam.T) bool {
        return true
 }
 
+// threeSentinel indicates that every element that's mod 3 == 0 is a sentinel.
+func threeSentinel(v beam.T) bool {
+       i := v.(int64)
+       return i > 0 && i%3 == 0
+}
+
+// closeSentinel indicates adjacent positions 3,4,5 are sentinels.
+func closeSentinel(i int64, _ beam.T) bool {
+       switch i {
+       case 3, 4, 5:
+               return true
+       default:
+               return false
+       }
+}
+
 // Watcher is an instance of the counters.
 type watcher struct {
        id                         int
@@ -304,7 +369,7 @@ func (fn *sepHarnessBase) setup() error {
        // Check if there's already a local channel for this id, and if not
        // start a watcher goroutine to poll and unblock the harness when
        // the expected number of sentinels is reached.
-       if _, ok := sepWaitMap[fn.WatcherID]; !ok {
+       if _, ok := sepWaitMap[fn.WatcherID]; ok {
                return nil
        }
        // We need a channel to block on for this watcherID
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 7be62611d42..c5a10cae7b4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -20,6 +20,7 @@ import (
        "context"
        "fmt"
        "io"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
@@ -63,16 +64,22 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, 
comps *pipepb.Componen
        slog.Debug("Execute: starting bundle", "bundle", rb, slog.String("tid", 
tid))
 
        var b *worker.B
-       var send bool
        inputData := em.InputForBundle(rb, s.inputInfo)
+       var dataReady <-chan struct{}
        switch s.envID {
        case "": // Runner Transforms
                // Runner transforms are processed immeadiately.
                b = s.exe.ExecuteTransform(tid, comps.GetTransforms()[tid], 
comps, rb.Watermark, inputData)
                b.InstID = rb.BundleID
                slog.Debug("Execute: runner transform", "bundle", rb, 
slog.String("tid", tid))
+
+               // Do some accounting for the fake bundle.
+               b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
+               close(b.Resp) // To
+               closed := make(chan struct{})
+               close(closed)
+               dataReady = closed
        case wk.ID:
-               send = true
                b = &worker.B{
                        PBDID:  s.ID,
                        InstID: rb.BundleID,
@@ -88,25 +95,78 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, 
comps *pipepb.Componen
                b.Init()
 
                s.prepareSides(b, s.transforms[0], rb.Watermark)
+
+               slog.Debug("Execute: processing", "bundle", rb)
+               defer b.Cleanup(wk)
+               dataReady = b.ProcessOn(wk)
        default:
                err := fmt.Errorf("unknown environment[%v]", s.envID)
                slog.Error("Execute", err)
                panic(err)
        }
 
-       if send {
-               slog.Debug("Execute: processing", "bundle", rb)
-               b.ProcessOn(wk) // Blocks until finished.
+       // Progress + split loop.
+       previousIndex := int64(-2)
+       var splitsDone bool
+       progTick := time.NewTicker(100 * time.Millisecond)
+progress:
+       for {
+               select {
+               case <-dataReady:
+                       progTick.Stop()
+                       break progress // exit progress loop on close.
+               case <-progTick.C:
+                       resp := b.Progress(wk)
+                       index, unknownIDs := j.ContributeTentativeMetrics(resp)
+                       if len(unknownIDs) > 0 {
+                               md := wk.MonitoringMetadata(unknownIDs)
+                               j.AddMetricShortIDs(md)
+                       }
+                       slog.Debug("progress report", "bundle", rb, "index", 
index)
+                       // Progress for the bundle hasn't advanced. Try 
splitting.
+                       if previousIndex == index && !splitsDone {
+                               sr := b.Split(wk, 0.5 /* fraction of remainder 
*/, nil /* allowed splits */)
+                               if sr.GetChannelSplits() == nil {
+                                       slog.Warn("split failed", "bundle", rb)
+                                       splitsDone = true
+                                       continue progress
+                               }
+                               // TODO sort out rescheduling primary Roots on 
bundle failure.
+                               var residualData [][]byte
+                               for _, rr := range sr.GetResidualRoots() {
+                                       ba := rr.GetApplication()
+                                       residualData = append(residualData, 
ba.GetElement())
+                                       if len(ba.GetElement()) == 0 {
+                                               slog.LogAttrs(context.TODO(), 
slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
+                                               panic("sdk returned empty 
residual application")
+                                       }
+                                       // TODO what happens to output 
watermarks on splits?
+                               }
+                               if len(sr.GetChannelSplits()) != 1 {
+                                       slog.Warn("received non-single channel 
split", "bundle", rb)
+                               }
+                               cs := sr.GetChannelSplits()[0]
+                               fr := cs.GetFirstResidualElement()
+                               // The first residual can be after the end of 
data, so filter out those cases.
+                               if len(b.InputData) >= int(fr) {
+                                       b.InputData = b.InputData[:int(fr)]
+                                       em.ReturnResiduals(rb, int(fr), 
s.inputInfo, residualData)
+                               }
+                       } else {
+                               previousIndex = index
+                       }
+               }
        }
        // Tentative Data is ready, commit it to the main datastore.
        slog.Debug("Execute: commiting data", "bundle", rb, 
slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", 
maps.Keys(s.OutputsToCoders)))
 
-       resp := &fnpb.ProcessBundleResponse{}
-       if send {
-               resp = <-b.Resp
-               // Tally metrics immeadiately so they're available before
-               // pipeline termination.
-               j.ContributeMetrics(resp)
+       resp := <-b.Resp
+       // Tally metrics immeadiately so they're available before
+       // pipeline termination.
+       unknownIDs := j.ContributeFinalMetrics(resp)
+       if len(unknownIDs) > 0 {
+               md := wk.MonitoringMetadata(unknownIDs)
+               j.AddMetricShortIDs(md)
        }
        // TODO handle side input data properly.
        wk.D.Commit(b.OutputData)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go 
b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go
index 4aa07a46c6f..6e7a9d27aee 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go
@@ -115,7 +115,7 @@ func dofn3x1(sum int64, iter1, iter2 func(*int64) bool, 
emit func(int64)) {
        emit(sum)
 }
 
-// int64Check validates that within a single bundle,
+// int64Check validates that within a single bundle, for each window,
 // we received the expected int64 values & sends them downstream.
 //
 // Invalid pattern for general testing, as it will fail
@@ -123,20 +123,28 @@ func dofn3x1(sum int64, iter1, iter2 func(*int64) bool, 
emit func(int64)) {
 type int64Check struct {
        Name string
        Want []int
-       got  []int
+       got  map[beam.Window][]int
 }
 
-func (fn *int64Check) ProcessElement(v int64, _ func(int64)) {
-       fn.got = append(fn.got, int(v))
+func (fn *int64Check) StartBundle(_ func(int64)) error {
+       fn.got = map[beam.Window][]int{}
+       return nil
+}
+
+func (fn *int64Check) ProcessElement(w beam.Window, v int64, _ func(int64)) {
+       fn.got[w] = append(fn.got[w], int(v))
 }
 
 func (fn *int64Check) FinishBundle(_ func(int64)) error {
-       sort.Ints(fn.got)
        sort.Ints(fn.Want)
-       if d := cmp.Diff(fn.Want, fn.got); d != "" {
-               return fmt.Errorf("int64Check[%v] (-want, +got): %v", fn.Name, 
d)
+       // Check for each window individually.
+       for _, vs := range fn.got {
+               sort.Ints(vs)
+               if d := cmp.Diff(fn.Want, vs); d != "" {
+                       return fmt.Errorf("int64Check[%v] (-want, +got): %v", 
fn.Name, d)
+               }
+               // Clear for subsequent calls.
        }
-       // Clear for subsequent calls.
        fn.got = nil
        return nil
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
index 3596c40f0dc..5ca4eb9fd4c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
@@ -16,6 +16,7 @@
 package internal
 
 import (
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 )
@@ -40,7 +41,7 @@ func init() {
        register.Function2x0(dofnKV2)
        register.Function3x0(dofnGBK)
        register.Function3x0(dofnGBK2)
-       register.DoFn2x0[int64, func(int64)]((*int64Check)(nil))
+       register.DoFn3x0[beam.Window, int64, func(int64)]((*int64Check)(nil))
        register.DoFn2x0[string, func(string)]((*stringCheck)(nil))
        register.Function2x0(dofnKV3)
        register.Function3x0(dofnGBK3)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go 
b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
index 035ab3c0727..40b15ee5d05 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
@@ -42,14 +42,15 @@ func quickUrn[Enum protoEnum](v Enum) string {
 }
 
 var (
-       ptUrn   = toUrn[pipepb.StandardPTransforms_Primitives]()
-       ctUrn   = toUrn[pipepb.StandardPTransforms_Composites]()
-       cmbtUrn = toUrn[pipepb.StandardPTransforms_CombineComponents]()
-       sdfUrn  = toUrn[pipepb.StandardPTransforms_SplittableParDoComponents]()
-       siUrn   = toUrn[pipepb.StandardSideInputTypes_Enum]()
-       cdrUrn  = toUrn[pipepb.StandardCoders_Enum]()
-       reqUrn  = toUrn[pipepb.StandardRequirements_Enum]()
-       envUrn  = toUrn[pipepb.StandardEnvironments_Environments]()
+       ptUrn      = toUrn[pipepb.StandardPTransforms_Primitives]()
+       ctUrn      = toUrn[pipepb.StandardPTransforms_Composites]()
+       cmbtUrn    = toUrn[pipepb.StandardPTransforms_CombineComponents]()
+       sdfUrn     = 
toUrn[pipepb.StandardPTransforms_SplittableParDoComponents]()
+       siUrn      = toUrn[pipepb.StandardSideInputTypes_Enum]()
+       cdrUrn     = toUrn[pipepb.StandardCoders_Enum]()
+       reqUrn     = toUrn[pipepb.StandardRequirements_Enum]()
+       runProcUrn = toUrn[pipepb.StandardRunnerProtocols_Enum]()
+       envUrn     = toUrn[pipepb.StandardEnvironments_Environments]()
 )
 
 var (
@@ -120,6 +121,10 @@ var (
        RequirementStatefulProcessing = 
reqUrn(pipepb.StandardRequirements_REQUIRES_STATEFUL_PROCESSING)
        RequirementTimeSortedInput    = 
reqUrn(pipepb.StandardRequirements_REQUIRES_TIME_SORTED_INPUT)
 
+       // Capabilities
+       CapabilityMonitoringInfoShortIDs           = 
runProcUrn(pipepb.StandardRunnerProtocols_MONITORING_INFO_SHORT_IDS)
+       CapabilityControlResponseElementsEmbedding = 
runProcUrn(pipepb.StandardRunnerProtocols_CONTROL_RESPONSE_ELEMENTS_EMBEDDING)
+
        // Environment types
        EnvDocker   = envUrn(pipepb.StandardEnvironments_DOCKER)
        EnvProcess  = envUrn(pipepb.StandardEnvironments_PROCESS)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index f6fbf1293f4..77f7094a97e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -16,7 +16,7 @@
 package worker
 
 import (
-       "sync"
+       "sync/atomic"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
@@ -40,19 +40,22 @@ type B struct {
        // MultiMapSideInputData is a map from transformID, to inputID, to 
window, to data key, to data values.
        MultiMapSideInputData 
map[string]map[string]map[typex.Window]map[string][][]byte
 
-       // OutputCount is the number of data outputs this bundle has.
+       // OutputCount is the number of data or timer outputs this bundle has.
        // We need to see this many closed data channels before the bundle is 
complete.
        OutputCount int
-       // dataWait is how we determine if a bundle is finished, by waiting for 
each of
+       // DataWait is how we determine if a bundle is finished, by waiting for 
each of
        // a Bundle's DataSinks to produce their last output.
        // After this point we can "commit" the bundle's output for downstream 
use.
-       dataWait   sync.WaitGroup
+       DataWait   chan struct{}
+       dataSema   atomic.Int32
        OutputData engine.TentativeData
-       Resp       chan *fnpb.ProcessBundleResponse
 
-       SinkToPCollection map[string]string
+       // TODO move response channel to an atomic and an additional
+       // block on the DataWait channel, to allow progress & splits for
+       // no output DoFns.
+       Resp chan *fnpb.ProcessBundleResponse
 
-       // TODO: Metrics for this bundle, can be handled after the fact.
+       SinkToPCollection map[string]string
 }
 
 // Init initializes the bundle's internal state for waiting on all
@@ -60,27 +63,43 @@ type B struct {
 func (b *B) Init() {
        // We need to see final data signals that match the number of
        // outputs the stage this bundle executes posesses
-       b.dataWait.Add(b.OutputCount)
+       b.dataSema.Store(int32(b.OutputCount))
+       b.DataWait = make(chan struct{})
+       if b.OutputCount == 0 {
+               close(b.DataWait) // Can happen if there are no outputs for the 
bundle.
+       }
        b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
 }
 
+// DataDone indicates a final element has been received from a Data or Timer 
output.
+func (b *B) DataDone() {
+       sema := b.dataSema.Add(-1)
+       if sema == 0 {
+               close(b.DataWait)
+       }
+}
+
 func (b *B) LogValue() slog.Value {
        return slog.GroupValue(
                slog.String("ID", b.InstID),
                slog.String("stage", b.PBDID))
 }
 
-// ProcessOn executes the given bundle on the given W, blocking
-// until all data is complete.
+func (b *B) Respond(resp *fnpb.InstructionResponse) {
+       b.Resp <- resp.GetProcessBundle()
+}
+
+// ProcessOn executes the given bundle on the given W.
+// The returned channel is closed once all expected data is returned.
 //
 // Assumes the bundle is initialized (all maps are non-nil, and data waitgroup 
is set, response channel initialized)
 // Assumes the bundle descriptor is already registered with the W.
 //
 // While this method mostly manipulates a W, putting it on a B avoids mixing 
the workers
 // public GRPC APIs up with local calls.
-func (b *B) ProcessOn(wk *W) {
+func (b *B) ProcessOn(wk *W) <-chan struct{} {
        wk.mu.Lock()
-       wk.bundles[b.InstID] = b
+       wk.activeInstructions[b.InstID] = b
        wk.mu.Unlock()
 
        slog.Debug("processing", "bundle", b, "worker", wk)
@@ -108,7 +127,39 @@ func (b *B) ProcessOn(wk *W) {
                        },
                }
        }
+       return b.DataWait
+}
+
+// Cleanup unregisters the bundle from the worker.
+func (b *B) Cleanup(wk *W) {
+       wk.mu.Lock()
+       delete(wk.activeInstructions, b.InstID)
+       wk.mu.Unlock()
+}
+
+func (b *B) Progress(wk *W) *fnpb.ProcessBundleProgressResponse {
+       return wk.sendInstruction(&fnpb.InstructionRequest{
+               Request: &fnpb.InstructionRequest_ProcessBundleProgress{
+                       ProcessBundleProgress: 
&fnpb.ProcessBundleProgressRequest{
+                               InstructionId: b.InstID,
+                       },
+               },
+       }).GetProcessBundleProgress()
+}
 
-       slog.Debug("waiting on data", "bundle", b)
-       b.dataWait.Wait() // Wait until data is ready.
+func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) 
*fnpb.ProcessBundleSplitResponse {
+       return wk.sendInstruction(&fnpb.InstructionRequest{
+               Request: &fnpb.InstructionRequest_ProcessBundleSplit{
+                       ProcessBundleSplit: &fnpb.ProcessBundleSplitRequest{
+                               InstructionId: b.InstID,
+                               DesiredSplits: 
map[string]*fnpb.ProcessBundleSplitRequest_DesiredSplit{
+                                       b.InputTransformID: {
+                                               FractionOfRemainder:    
fraction,
+                                               AllowedSplitPoints:     
allowedSplits,
+                                               EstimatedInputElements: 
int64(len(b.InputData)),
+                                       },
+                               },
+                       },
+               },
+       }).GetProcessBundleSplit()
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
index 154306c3f6b..bfef6873426 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
@@ -36,7 +36,7 @@ func TestBundle_ProcessOn(t *testing.T) {
                b.ProcessOn(wk)
                completed.Done()
        }()
-       b.dataWait.Done()
+       b.DataDone()
        gotData := <-wk.DataReqs
        if got, want := gotData.GetData()[0].GetData(), []byte{1, 2, 3}; 
!bytes.EqualFold(got, want) {
                t.Errorf("ProcessOn(): data not sent; got %v, want %v", got, 
want)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 324cc83154a..04118f14735 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -26,12 +26,15 @@ import (
        "sync"
        "sync/atomic"
 
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
        "golang.org/x/exp/slog"
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
@@ -47,6 +50,7 @@ type W struct {
        fnpb.UnimplementedBeamFnDataServer
        fnpb.UnimplementedBeamFnStateServer
        fnpb.UnimplementedBeamFnLoggingServer
+       fnpb.UnimplementedProvisionServiceServer
 
        ID string
 
@@ -60,13 +64,17 @@ type W struct {
        InstReqs chan *fnpb.InstructionRequest
        DataReqs chan *fnpb.Elements
 
-       mu          sync.Mutex
-       bundles     map[string]*B                            // Bundles keyed 
by InstructionID
-       Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by 
PBDID
+       mu                 sync.Mutex
+       activeInstructions map[string]controlResponder              // Active 
instructions keyed by InstructionID
+       Descriptors        map[string]*fnpb.ProcessBundleDescriptor // Stages 
keyed by PBDID
 
        D *DataService
 }
 
+type controlResponder interface {
+       Respond(*fnpb.InstructionResponse)
+}
+
 // New starts the worker server components of FnAPI Execution.
 func New(id string) *W {
        lis, err := net.Listen("tcp", ":0")
@@ -82,8 +90,8 @@ func New(id string) *W {
                InstReqs: make(chan *fnpb.InstructionRequest, 10),
                DataReqs: make(chan *fnpb.Elements, 10),
 
-               bundles:     make(map[string]*B),
-               Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+               activeInstructions: make(map[string]controlResponder),
+               Descriptors:        
make(map[string]*fnpb.ProcessBundleDescriptor),
 
                D: &DataService{},
        }
@@ -136,6 +144,32 @@ func (wk *W) NextStage() string {
 // TODO set logging level.
 var minsev = fnpb.LogEntry_Severity_DEBUG
 
+func (wk *W) GetProvisionInfo(_ context.Context, _ 
*fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) {
+       endpoint := &pipepb.ApiServiceDescriptor{
+               Url: wk.Endpoint(),
+       }
+       resp := &fnpb.GetProvisionInfoResponse{
+               Info: &fnpb.ProvisionInfo{
+                       // TODO: Add the job's Pipeline options
+                       // TODO: Include runner capabilities with the per job 
configuration.
+                       RunnerCapabilities: []string{
+                               urns.CapabilityMonitoringInfoShortIDs,
+                       },
+                       LoggingEndpoint:  endpoint,
+                       ControlEndpoint:  endpoint,
+                       ArtifactEndpoint: endpoint,
+                       // TODO add this job's RetrievalToken
+                       // TODO add this job's artifact Dependencies
+
+                       Metadata: map[string]string{
+                               "runner":         "prism",
+                               "runner_version": core.SdkVersion,
+                       },
+               },
+       }
+       return resp, nil
+}
+
 // Logging relates SDK worker messages back to the job that spawned them.
 // Messages are received from the SDK,
 func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
@@ -217,14 +251,14 @@ func (wk *W) Control(ctrl 
fnpb.BeamFnControl_ControlServer) error {
 
                        // TODO: Do more than assume these are 
ProcessBundleResponses.
                        wk.mu.Lock()
-                       if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+                       if b, ok := 
wk.activeInstructions[resp.GetInstructionId()]; ok {
                                // TODO. Better pipeline error handling.
                                if resp.Error != "" {
                                        slog.LogAttrs(context.TODO(), 
slog.LevelError, "ctrl.Recv pipeline error",
                                                slog.String("error", 
resp.GetError()))
                                        panic(resp.GetError())
                                }
-                               b.Resp <- resp.GetProcessBundle()
+                               b.Respond(resp)
                        } else {
                                slog.Debug("ctrl.Recv: %v", resp)
                        }
@@ -264,11 +298,13 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
                        }
                        wk.mu.Lock()
                        for _, d := range resp.GetData() {
-                               b, ok := wk.bundles[d.GetInstructionId()]
+                               cr, ok := 
wk.activeInstructions[d.GetInstructionId()]
                                if !ok {
                                        slog.Info("data.Recv for unknown 
bundle", "response", resp)
                                        continue
                                }
+                               // Received data is always for an active 
ProcessBundle instruction
+                               b := cr.(*B)
                                colID := b.SinkToPCollection[d.GetTransformId()]
 
                                // There might not be data, eg. for side 
inputs, so we need to reconcile this elsewhere for
@@ -277,7 +313,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
                                        b.OutputData.WriteData(colID, 
d.GetData())
                                }
                                if d.GetIsLast() {
-                                       b.dataWait.Done()
+                                       b.DataDone()
                                }
                        }
                        wk.mu.Unlock()
@@ -320,7 +356,11 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) 
error {
                        switch req.GetRequest().(type) {
                        case *fnpb.StateRequest_Get:
                                // TODO: move data handling to be pcollection 
based.
-                               b := wk.bundles[req.GetInstructionId()]
+
+                               // State requests are always for an active 
ProcessBundle instruction
+                               wk.mu.Lock()
+                               b := 
wk.activeInstructions[req.GetInstructionId()].(*B)
+                               wk.mu.Unlock()
                                key := req.GetStateKey()
                                slog.Debug("StateRequest_Get", 
prototext.Format(req), "bundle", b)
 
@@ -399,15 +439,67 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) 
error {
        return nil
 }
 
+var chanResponderPool = sync.Pool{
+       New: func() any {
+               return &chanResponder{make(chan *fnpb.InstructionResponse, 1)}
+       },
+}
+
+type chanResponder struct {
+       Resp chan *fnpb.InstructionResponse
+}
+
+func (cr *chanResponder) Respond(resp *fnpb.InstructionResponse) {
+       cr.Resp <- resp
+}
+
+// sendInstruction is a helper for creating and sending worker single RPCs, 
blocking
+// until the response returns.
+func (wk *W) sendInstruction(req *fnpb.InstructionRequest) 
*fnpb.InstructionResponse {
+       cr := chanResponderPool.Get().(*chanResponder)
+       progInst := wk.NextInst()
+       wk.mu.Lock()
+       wk.activeInstructions[progInst] = cr
+       wk.mu.Unlock()
+
+       defer func() {
+               wk.mu.Lock()
+               delete(wk.activeInstructions, progInst)
+               wk.mu.Unlock()
+               chanResponderPool.Put(cr)
+       }()
+
+       req.InstructionId = progInst
+
+       // Tell the SDK to start processing the bundle.
+       wk.InstReqs <- req
+       // Protos are safe as nil, so just return directly.
+       return <-cr.Resp
+}
+
+// MonitoringMetadata is a convenience method to request the metadata for 
monitoring shortIDs.
+func (wk *W) MonitoringMetadata(unknownIDs []string) 
*fnpb.MonitoringInfosMetadataResponse {
+       return wk.sendInstruction(&fnpb.InstructionRequest{
+               Request: &fnpb.InstructionRequest_MonitoringInfos{
+                       MonitoringInfos: &fnpb.MonitoringInfosMetadataRequest{
+                               MonitoringInfoId: unknownIDs,
+                       },
+               },
+       }).GetMonitoringInfos()
+}
+
 // DataService is slated to be deleted in favour of stage based state
 // management for side inputs.
 type DataService struct {
+       mu sync.Mutex
        // TODO actually quick process the data to windows here as well.
        raw map[string][][]byte
 }
 
 // Commit tentative data to the datastore.
 func (d *DataService) Commit(tent engine.TentativeData) {
+       d.mu.Lock()
+       defer d.mu.Unlock()
        if d.raw == nil {
                d.raw = map[string][][]byte{}
        }
@@ -418,5 +510,7 @@ func (d *DataService) Commit(tent engine.TentativeData) {
 
 // GetAllData is a hack for Side Inputs until watermarks are sorted out.
 func (d *DataService) GetAllData(colID string) [][]byte {
+       d.mu.Lock()
+       defer d.mu.Unlock()
        return d.raw[colID]
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index 29b3fab92d6..14b879fe242 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -145,7 +145,7 @@ func TestWorker_Control_HappyPath(t *testing.T) {
 
        b := &B{}
        b.Init()
-       wk.bundles[instID] = b
+       wk.activeInstructions[instID] = b
        b.ProcessOn(wk)
 
        ctrlStream.Send(&fnpb.InstructionResponse{
@@ -187,7 +187,7 @@ func TestWorker_Data_HappyPath(t *testing.T) {
                OutputCount: 1,
        }
        b.Init()
-       wk.bundles[instID] = b
+       wk.activeInstructions[instID] = b
 
        var wg sync.WaitGroup
        wg.Add(1)
@@ -235,7 +235,7 @@ func TestWorker_State_Iterable(t *testing.T) {
        }
 
        instID := wk.NextInst()
-       wk.bundles[instID] = &B{
+       wk.activeInstructions[instID] = &B{
                IterableSideInputData: 
map[string]map[string]map[typex.Window][][]byte{
                        "transformID": {
                                "i1": {

Reply via email to