This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 50ca9e306a7 [prism] Add dynamic channel & sub element splits. (#26484) 50ca9e306a7 is described below commit 50ca9e306a7f788486724936e1b68451eff41aff Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Tue May 2 12:14:53 2023 -0700 [prism] Add dynamic channel & sub element splits. (#26484) * Add channel split separations. * Use channel leases for parallel processing. * move data wait to chan + atomic * Progress and split on execute goroutine. * Convert metrics tests to short ids. * Add Provision handler w/ capabilities. * Add artifact validation to run_rc_validation (#26407) * Add artifact validation to run_rc_validation * config file for artifacts * use java bom contents for validation * add pip_pre = True * Update tox.ini * Remove out of date experimental warning on SDFs (#26450) * [ToBF] Refinement 26.04.23 (#26428) * content tree rebuilds on sdk change * expandable parent node widget * comment fix * comment fix --------- Co-authored-by: darkhan.nausharipov <darkhan.naushari...@kzn.akvelon.com> * Allow implicit flattening for yaml inputs. (#26423) * Add field annotations for high-priority Syndeo schema transforms (#26384) add changes * [Go SDK] Timers with new datalayer (#26101) * added timer package * add timer changes and merged with rebo's pr * timer fired in stateful * error setting new timer in ontimer * looping timers work * send fv instead of bytes * changes to coder/pardo * works for all cases, only cleanup left * remove comments and validate onTimer * generic coder for user key * fixes coder end to end * remove logs * add unit test and refactor * add docs * new example * fix static lint * support emitters * allow input col of CoGBK as well * unit tests, periodic impulse, minor refactor * update PipelineTimer interface, minor refactor, doc comment for example * add warn message * single edge timer coder, rm kv coder check, cache encoder,decoder * Update chromedriver-binary requirement in /sdks/python Updates the requirements on [chromedriver-binary](https://github.com/danielkaiser/python-chromedriver-binary) to permit the latest version. - [Release notes](https://github.com/danielkaiser/python-chromedriver-binary/releases) - [Commits](https://github.com/danielkaiser/python-chromedriver-binary/compare/v100.0.4896.20.0...v113.0.5672.24.0) --- updated-dependencies: - dependency-name: chromedriver-binary dependency-type: direct:development ... Signed-off-by: dependabot[bot] <supp...@github.com> * Automation: Tour of Beam infrastructure deployment (#25793) * Added Terraform scripts for TOB infra * ToB Frontend related updates * Update settings.gradle.kts * Deleted redundant file and minor README change * Addressing comments in the PR * Added newline at the end of variables.tf file * Update README.md * Updates related to Tour of Beam infrastructure * Update locals.tf * Output.tf updates * Update output.tf * Updates * Update main.tf * Updates to cloudfunctions_bucket variable * service_account_id changes * Update main.tf * Update README.md * Update README.md * Update README.md * Update README.md * Update README.md * Bulk update of terraform scripts * Update README.md * Update README.md * Datastore_namespace updates * Update README.md * Update README.md * Update README.md * Update README.md * Update main.tf * Update README.md * Update README.md * Update README.md * Some minor TF updates * Update README.md * Modify batch IT to use count instead of hash (#26327) * Modify batch IT to use count instead of hash * remove unused varaiable * run spotless, update pipeline state checking * Update timeout to 45 minutes * revert timeout, add additional counter to try and pinpoint missing records * add a log to notify ranges used when workers restart * change counts from metrics to combiners * add a window to streaming test * move the passert to the correct place * Remove extra counter, apply spotless * add additional metric to KafkaWriter * Remove debugging metrics * verify pipeline is not failed * remove extra newline * Revert "Modify batch IT to use count instead of hash (#26327)" (#26466) This reverts commit 9903b2f1e937beebdbb73be7a4e2eda760c0a169. * Bump Java Dataflow container images (#26459) * keep retrying mass_comment until it has started all jobs (#26457) * keep retrying mass_comment until it has started all jobs * fix lookups * Add driverJars parameter to JdbcIO. (#25824) This change allows users to use driver jars saved in GCS. With this change, Dataflow templates will be able to migrate to JdbcIO instead of DynamicJdbcIO. * [Roll Fwd PR] Rename _namespace to _get_display_data_namespace"" (#26470) * Update parquetio and textio to work with -beam_strict (#26469) * use wheel sdk location for PostCommit_Py_Examples (#26473) * Move back the timeout of Python PostCommit to 4h * Minor fix on Python PostCommit description strings * Add recent postcommits to jenkins README * More user-friendly providers. * Add yaml preprocessing phases. * Add flexible windowing syntax to yaml. * Implement flatten in terms of preprocessor phase. This composes better with windowing. * Reword SQL note. * Make linter happy. * Survive errors in size estimation in MongoDbIO * Fix jdbc xlang schema type mismatch (#26480) * Fix jdbc xlang schema type mismatch * Also fix fetch_size type mismatch * Add new fields in the end * [Python] Add saved_weights example to tf notebook (#26472) * add saved_weights example to tf notebook * add description * updated text blocks * Update examples/notebooks/beam-ml/run_inference_tensorflow.ipynb Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com> * Update examples/notebooks/beam-ml/run_inference_tensorflow.ipynb Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com> --------- Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com> * Revert "Revert "Modify batch IT to use count instead of hash (#26327)" (#26466)" (#26467) This reverts commit 5c676687a1faecf7166481e684792d26f7626289. * Bump github.com/tetratelabs/wazero from 1.0.3 to 1.1.0 in /sdks (#26486) Bumps [github.com/tetratelabs/wazero](https://github.com/tetratelabs/wazero) from 1.0.3 to 1.1.0. - [Release notes](https://github.com/tetratelabs/wazero/releases) - [Commits](https://github.com/tetratelabs/wazero/compare/v1.0.3...v1.1.0) --- updated-dependencies: - dependency-name: github.com/tetratelabs/wazero dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <supp...@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * tour of beam integration tests (#25925) * Integration test to load the default example of the default SDK and change the example (#24730) (#24729) * Fix formatting and README (#24730) * Support collection v1.17.0 (#24730) * LoadingIndicator on chaning examples, remove duplicating licenses (#24730) * Add a missing license header (#24730) * Integration test for changing SDK and running code (#24779) (#382) * Integration test for changing SDK and running code (#24779) * Rename an integration test (#24779) * Use enum to switch SDK in integration test (#24779) * Find SDK in a dropdown by key (#24779) * Add a TODO (#24779) * Fix exports (#24779) * Issue24779 integration changing sdk from 24370 (#387) * Integration test for changing SDK and running code (#24779) * Rename an integration test (#24779) * Use enum to switch SDK in integration test (#24779) * Find SDK in a dropdown by key (#24779) * Add a TODO (#24779) * Fix exports (#24779) * Integration tests miscellaneous UI (#383) * miscellaneous ui integration tests * reverted pubspec.lock * gradle tasks ordered alhpabetically * integration tests refactoring * clean code * integration tests miscellaneous ui fix pr * rename method * added layout adaptivity * A minor cleanup (#24779) Co-authored-by: Dmitry Repin <mr.mal...@gmail.com> * integration tests run and editing * example selector test * minor fixes * rat * fix pr * minor * minor * rat * integration test finder written * integration test minor fixes * minor fixes * removed comment * minor fixes * playground integration tests minor fixes * integration test pumpAnSettleNoException * integration test shortcut refactor * integration test another changing shortcuts running * upgrade to flutter 3.7.1 * workaround comment * playground frontend updated major versions * issues 25329 25331 25336 * 25329 extract connectivity extension to separate file * Upgrade Flutter to 3.7.3 in integration tests (#24730) * Fix integration test (#24730) * fix cors issue and added mouse scroll to tags * Upgrade Flutter in Dockerfile (#24720) * sorting moved to model * sorting moved to model * sorting moved to model * bugs fix * issue 25278 * fix pr * quites fix in en.yaml * Fix not loading default example (#25528) * fix compile error * Refactor output tabs, test embedded playground (#25136) (#439) * Refactor output tabs, test embedded playground (#25136) * Clean up (#25136) * Change example paths to IDs in integration tests * issue25640 tob ci * fix tob ci * rename ci process * test add new line to main * test add new line to main * commented unit test run * issue25640 changed server path * issue25640 tests on welcome page * deleted config.g.dart * issue25640 pr fixes * Update .github/workflows/tour_of_beam_frontend_test.yml Co-authored-by: alexeyinkin <l...@inkin.ru> * Update learning/tour-of-beam/frontend/integration_test/welcome_page_test.dart Co-authored-by: alexeyinkin <l...@inkin.ru> * Improve tests (#25640) * issue25640 tour page tests * pr fix * removed import * pr fix * fix test * 25640 fixed pubspec.lock * issue25640 fix readme * updated readme * issue25640 fixed after master merge * issue25483 ToB pipeline options * removed unnecesary variable * pr fix * Update learning/tour-of-beam/frontend/assets/translations/en.yaml Co-authored-by: alexeyinkin <l...@inkin.ru> * playground hides when snippet does not exists * pipeline options extracted to playground components * issue25483 pipeline options * added errors handling, fix pr * refactoring * Revert "refactoring" This reverts commit 1540961fe7b66673f70f56632947cdd988c3a0b7. * removed unnecessary constants * playground controller in tour notifier becomes nullable * playground controller returned to non nullable in tour notifier * playground controller actions * removed unnecessary code * tob scaffold wrapped with animated builder * minor fixes * partially fixed tests * Upgrade flutter_code_editor to v0.2.19 (#25640) * Replace output SelectableText with a CodeField instance (#25640) * Trigger ToB integration tests (#25640) * Clean up (#25640) * Enable manual workflow runs for Playground and ToB integration tests (#25640) --------- Co-authored-by: Alexey Inkin <alexey.in...@akvelon.com> Co-authored-by: alexeyinkin <l...@inkin.ru> * Eliminate nullness errors from MongoDbIO * Move provisioned options outside of harness.Main (#26476) Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com> * Add SDF adjacent element test * Consolidate residual processing * fix race condition on split boundaries * deflake fixed window combine tests * Add comments to job functions. * Make int64check per window. * Update sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go Co-authored-by: Ritesh Ghorse <riteshgho...@gmail.com> --------- Signed-off-by: dependabot[bot] <supp...@github.com> Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com> Co-authored-by: Danny McCormick <dannymccorm...@google.com> Co-authored-by: Anand Inguva <anandinguv...@gmail.com> Co-authored-by: Anand Inguva <34158215+ananding...@users.noreply.github.com> Co-authored-by: Darkhan Nausharipov <31556582+naushari...@users.noreply.github.com> Co-authored-by: darkhan.nausharipov <darkhan.naushari...@kzn.akvelon.com> Co-authored-by: Robert Bradshaw <rober...@gmail.com> Co-authored-by: Andrei Gurau <andreigu...@google.com> Co-authored-by: Ritesh Ghorse <riteshgho...@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: ruslan-ikhsan <114978215+ruslan-ikh...@users.noreply.github.com> Co-authored-by: johnjcasey <95318300+johnjca...@users.noreply.github.com> Co-authored-by: Yi Hu <ya...@google.com> Co-authored-by: Pranav Bhandari <bhandari.prana...@gmail.com> Co-authored-by: Svetak Sundhar <svetaksund...@google.com> Co-authored-by: Jeremy Edwards <jerem...@gmail.com> Co-authored-by: Kenneth Knowles <k...@google.com> Co-authored-by: Rebecca Szper <98840847+rsz...@users.noreply.github.com> Co-authored-by: Dmitry Repin <mr.mal...@gmail.com> Co-authored-by: Alexey Inkin <alexey.in...@akvelon.com> Co-authored-by: alexeyinkin <l...@inkin.ru> --- .../prism/internal/engine/elementmanager.go | 86 ++++++++++++---- sdks/go/pkg/beam/runners/prism/internal/execute.go | 11 +- .../beam/runners/prism/internal/execute_test.go | 3 + .../beam/runners/prism/internal/jobservices/job.go | 21 +++- .../runners/prism/internal/jobservices/metrics.go | 88 ++++++++++++---- .../prism/internal/jobservices/metrics_test.go | 73 ++++++++----- .../beam/runners/prism/internal/separate_test.go | 79 ++++++++++++-- sdks/go/pkg/beam/runners/prism/internal/stage.go | 82 +++++++++++++-- .../pkg/beam/runners/prism/internal/testdofns.go | 24 +++-- .../beam/runners/prism/internal/testdofns_test.go | 3 +- .../pkg/beam/runners/prism/internal/urns/urns.go | 21 ++-- .../beam/runners/prism/internal/worker/bundle.go | 79 +++++++++++--- .../runners/prism/internal/worker/bundle_test.go | 2 +- .../beam/runners/prism/internal/worker/worker.go | 114 +++++++++++++++++++-- .../runners/prism/internal/worker/worker_test.go | 6 +- 15 files changed, 552 insertions(+), 140 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index aeabc81b812..89fececea10 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -276,6 +276,34 @@ func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte { return es.ToData(info) } +// reElementResiduals extracts the windowed value header from residual bytes, and explodes them +// back out to their windows. +func reElementResiduals(residuals [][]byte, inputInfo PColInfo, rb RunBundle) []element { + var unprocessedElements []element + for _, residual := range residuals { + buf := bytes.NewBuffer(residual) + ws, et, pn, err := exec.DecodeWindowedValueHeader(inputInfo.WDec, buf) + if err != nil { + if err == io.EOF { + break + } + slog.Error("reElementResiduals: error decoding residual header", err, "bundle", rb) + panic("error decoding residual header") + } + + for _, w := range ws { + unprocessedElements = append(unprocessedElements, + element{ + window: w, + timestamp: et, + pane: pn, + elmBytes: buf.Bytes(), + }) + } + } + return unprocessedElements +} + // PersistBundle uses the tentative bundle output to update the watermarks for the stage. // Each stage has two monotonically increasing watermarks, the input watermark, and the output // watermark. @@ -330,28 +358,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } // Return unprocessed to this stage's pending - var unprocessedElements []element - for _, residual := range residuals { - buf := bytes.NewBuffer(residual) - ws, et, pn, err := exec.DecodeWindowedValueHeader(inputInfo.WDec, buf) - if err != nil { - if err == io.EOF { - break - } - slog.Error("PersistBundle: error decoding residual header", err, "bundle", rb) - panic("error decoding residual header") - } - - for _, w := range ws { - unprocessedElements = append(unprocessedElements, - element{ - window: w, - timestamp: et, - pane: pn, - elmBytes: buf.Bytes(), - }) - } - } + unprocessedElements := reElementResiduals(residuals, inputInfo, rb) // Add unprocessed back to the pending stack. if len(unprocessedElements) > 0 { em.pendingElements.Add(len(unprocessedElements)) @@ -379,6 +386,21 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol em.addRefreshAndClearBundle(stage.ID, rb.BundleID) } +// ReturnResiduals is called after a successful split, so the remaining work +// can be re-assigned to a new bundle. +func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals [][]byte) { + stage := em.stages[rb.StageID] + + stage.splitBundle(rb, firstRsIndex) + unprocessedElements := reElementResiduals(residuals, inputInfo, rb) + if len(unprocessedElements) > 0 { + slog.Debug("ReturnResiduals: unprocessed elements", "bundle", rb, "count", len(unprocessedElements)) + em.pendingElements.Add(len(unprocessedElements)) + stage.AddPending(unprocessedElements) + } + em.addRefreshes(singleSet(rb.StageID)) +} + func (em *ElementManager) addRefreshes(stages set[string]) { em.refreshCond.L.Lock() defer em.refreshCond.L.Unlock() @@ -439,6 +461,10 @@ func (s set[K]) merge(o set[K]) { } } +func singleSet[T comparable](v T) set[T] { + return set[T]{v: struct{}{}} +} + // stageState is the internal watermark and input tracking for a stage. type stageState struct { ID string @@ -569,6 +595,22 @@ func (ss *stageState) startBundle(watermark mtime.Time, genBundID func() string) return bundID, true } +func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) { + ss.mu.Lock() + defer ss.mu.Unlock() + + es := ss.inprogress[rb.BundleID] + slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual) + + prim := es.es[:firstResidual] + res := es.es[firstResidual:] + + es.es = prim + ss.pending = append(ss.pending, res...) + heap.Init(&ss.pending) + ss.inprogress[rb.BundleID] = es +} + // minimumPendingTimestamp returns the minimum pending timestamp from all pending elements, // including in progress ones. // diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 2329a43d214..13c8b2b127c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -102,7 +102,6 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo endpoint := &pipepb.ApiServiceDescriptor{ Url: wk.Endpoint(), } - pool.StartWorker(ctx, &fnpb.StartWorkerRequest{ WorkerId: wk.ID, ControlEndpoint: endpoint, @@ -274,10 +273,16 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) { em.Impulse(id) } + // Use a channel to limit max parallelism for the pipeline. + maxParallelism := make(chan struct{}, 8) // Execute stages here for rb := range em.Bundles(ctx, wk.NextInst) { - s := stages[rb.StageID] - s.Execute(j, wk, comps, em, rb) + maxParallelism <- struct{}{} + go func(rb engine.RunBundle) { + defer func() { <-maxParallelism }() + s := stages[rb.StageID] + s.Execute(j, wk, comps, em, rb) + }(rb) } slog.Info("pipeline done!", slog.String("job", j.String())) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index de7247486bb..aac5d97abfb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -128,6 +128,9 @@ func TestRunner_Pipelines(t *testing.T) { qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool { return sr.Name() == "sunk" }) + if len(qr.Counters()) == 0 { + t.Fatal("no metrics, expected one.") + } if got, want := qr.Counters()[0].Committed, int64(73); got != want { t.Errorf("pr.Metrics.Query(Name = \"sunk\")).Committed = %v, want %v", got, want) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 95b1ce12af9..5b8e786ac6f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -39,7 +39,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) -var capabilities = map[string]struct{}{ +var supportedRequirements = map[string]struct{}{ urns.RequirementSplittableDoFn: {}, } @@ -48,13 +48,13 @@ var capabilities = map[string]struct{}{ func isSupported(requirements []string) error { var unsupported []string for _, req := range requirements { - if _, ok := capabilities[req]; !ok { + if _, ok := supportedRequirements[req]; !ok { unsupported = append(unsupported, req) } } if len(unsupported) > 0 { sort.Strings(unsupported) - return fmt.Errorf("local runner doesn't support the following required features: %v", strings.Join(unsupported, ",")) + return fmt.Errorf("prism runner doesn't support the following required features: %v", strings.Join(unsupported, ",")) } return nil } @@ -81,8 +81,19 @@ type Job struct { metrics metricsStore } -func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) { - j.metrics.ContributeMetrics(payloads) +// ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids. +func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) { + return j.metrics.ContributeTentativeMetrics(payloads) +} + +// ContributeFinalMetrics returns any unknown monitoring short ids. +func (j *Job) ContributeFinalMetrics(payloads *fnpb.ProcessBundleResponse) []string { + return j.metrics.ContributeFinalMetrics(payloads) +} + +// AddMetricShortIDs populates metric short IDs with their metadata. +func (j *Job) AddMetricShortIDs(ids *fnpb.MonitoringInfosMetadataResponse) { + j.metrics.AddShortIDs(ids) } func (j *Job) String() string { diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index 39936bae72f..6db16191de9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -449,46 +449,92 @@ func (k apiRequestLatenciesKey) Labels() map[string]string { type metricsStore struct { mu sync.Mutex - accums map[metricKey]metricAccumulator + accums [2]map[metricKey]metricAccumulator + + shortIDsToKeys map[string]metricKey + unprocessedPayloads [2]map[string][]byte } -func (m *metricsStore) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) { +func (m *metricsStore) AddShortIDs(resp *fnpb.MonitoringInfosMetadataResponse) { m.mu.Lock() defer m.mu.Unlock() - if m.accums == nil { - m.accums = map[metricKey]metricAccumulator{} + + if m.shortIDsToKeys == nil { + m.shortIDsToKeys = map[string]metricKey{} } - // Old and busted. - mons := payloads.GetMonitoringInfos() - for _, mon := range mons { - urn := mon.GetUrn() + + mis := resp.GetMonitoringInfo() + for short, mi := range mis { + urn := mi.GetUrn() ops, ok := mUrn2Ops[urn] if !ok { slog.Debug("unknown metrics urn", slog.String("urn", urn)) continue } - key := ops.keyFn(urn, mon.GetLabels()) - a, ok := m.accums[key] + key := ops.keyFn(urn, mi.GetLabels()) + m.shortIDsToKeys[short] = key + } + for d, payloads := range m.unprocessedPayloads { + m.contributeMetrics(durability(d), payloads) + m.unprocessedPayloads[d] = nil + } +} + +func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) (int64, []string) { + readIndex := int64(-1) + if m.accums[d] == nil { + m.accums[d] = map[metricKey]metricAccumulator{} + } + if m.unprocessedPayloads[d] == nil { + m.unprocessedPayloads[d] = map[string][]byte{} + } + accums := m.accums[d] + var missingShortIDs []string + for short, payload := range mdata { + key, ok := m.shortIDsToKeys[short] if !ok { + missingShortIDs = append(missingShortIDs, short) + m.unprocessedPayloads[d][short] = payload + continue + } + a, ok := accums[key] + if !ok || d == tentative { + ops, ok := mUrn2Ops[key.Urn()] + if !ok { + slog.Debug("unknown metrics urn", slog.String("urn", key.Urn())) + continue + } a = ops.newAccum() } - if err := a.accumulate(mon.GetPayload()); err != nil { - panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", urn, key, a)) + if err := a.accumulate(payload); err != nil { + panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", key.Urn(), key, a)) + } + accums[key] = a + if key.Urn() == "beam:metric:data_channel:read_index:v1" { + readIndex = a.(*sumInt64).sum } - m.accums[key] = a } - // New hotness. - mdata := payloads.GetMonitoringData() - _ = mdata + return readIndex, missingShortIDs +} + +func (m *metricsStore) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) { + m.mu.Lock() + defer m.mu.Unlock() + return m.contributeMetrics(tentative, payloads.GetMonitoringData()) +} + +func (m *metricsStore) ContributeFinalMetrics(payloads *fnpb.ProcessBundleResponse) []string { + m.mu.Lock() + defer m.mu.Unlock() + _, unknownIDs := m.contributeMetrics(committed, payloads.GetMonitoringData()) + return unknownIDs } func (m *metricsStore) Results(d durability) []*pipepb.MonitoringInfo { - // We don't gather tentative metrics yet. - if d == tentative { - return nil - } + m.mu.Lock() + defer m.mu.Unlock() infos := make([]*pipepb.MonitoringInfo, 0, len(m.accums)) - for key, accum := range m.accums { + for key, accum := range m.accums[d] { infos = append(infos, accum.toProto(key)) } return infos diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go index e0346731f30..bb3ef243ce6 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go @@ -33,7 +33,7 @@ import ( var metSpecs = (pipepb.MonitoringInfoSpecs_Enum)(0).Descriptor().Values() // makeInfo generates dummy Monitoring infos from a spec. -func makeInfo(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipepb.MonitoringInfo { +func makeInfo(enum pipepb.MonitoringInfoSpecs_Enum) *pipepb.MonitoringInfo { spec := proto.GetExtension(metSpecs.ByNumber(protoreflect.EnumNumber(enum)).Options(), pipepb.E_MonitoringInfoSpec).(*pipepb.MonitoringInfoSpec) labels := map[string]string{} @@ -41,13 +41,18 @@ func makeInfo(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipepb.Moni labels[l] = l } return &pipepb.MonitoringInfo{ - Urn: spec.GetUrn(), - Type: spec.GetType(), - Labels: labels, - Payload: payload, + Urn: spec.GetUrn(), + Type: spec.GetType(), + Labels: labels, } } +func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipepb.MonitoringInfo { + info := makeInfo(enum) + info.Payload = payload + return info +} + // This test validates that multiple contributions are correctly summed up and accumulated. func Test_metricsStore_ContributeMetrics(t *testing.T) { @@ -70,45 +75,58 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) { name string // TODO convert input to non-legacy metrics once we support, and then delete these. - input [][]*pipepb.MonitoringInfo + input []map[string][]byte + shortIDs map[string]*pipepb.MonitoringInfo want []*pipepb.MonitoringInfo }{ { name: "int64Sum", - input: [][]*pipepb.MonitoringInfo{ - {makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{3})}, - {makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{5})}, + input: []map[string][]byte{ + {"a": []byte{3}}, + {"a": []byte{5}}, + }, + shortIDs: map[string]*pipepb.MonitoringInfo{ + "a": makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64), }, want: []*pipepb.MonitoringInfo{ - makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{8}), + makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SUM_INT64, []byte{8}), }, }, { name: "float64Sum", - input: [][]*pipepb.MonitoringInfo{ - {makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(3.14))}, - {makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(1.06))}, + input: []map[string][]byte{ + {"a": doubleBytes(3.14)}, + {"a": doubleBytes(1.06)}, + }, + shortIDs: map[string]*pipepb.MonitoringInfo{ + "a": makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE), }, want: []*pipepb.MonitoringInfo{ - makeInfo(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(4.20)), + makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SUM_DOUBLE, doubleBytes(4.20)), }, }, { name: "progress", - input: [][]*pipepb.MonitoringInfo{ - {makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(1, 2.2, 78))}, - {makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(0, 7.8, 22))}, + input: []map[string][]byte{ + {"a": progress(1, 2.2, 78)}, + {"a": progress(0, 7.8, 22)}, + }, + shortIDs: map[string]*pipepb.MonitoringInfo{ + "a": makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING), }, want: []*pipepb.MonitoringInfo{ - makeInfo(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(0, 7.8, 22)), + makeInfoWBytes(pipepb.MonitoringInfoSpecs_WORK_REMAINING, progress(0, 7.8, 22)), }, }, { name: "int64Distribution", - input: [][]*pipepb.MonitoringInfo{ - {makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{1, 2, 2, 2})}, - {makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{3, 17, 5, 7})}, + input: []map[string][]byte{ + {"a": []byte{1, 2, 2, 2}}, + {"a": []byte{3, 17, 5, 7}}, + }, + shortIDs: map[string]*pipepb.MonitoringInfo{ + "a": makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64), }, want: []*pipepb.MonitoringInfo{ - makeInfo(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{4, 19, 2, 7}), + makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{4, 19, 2, 7}), }, }, } @@ -117,11 +135,14 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) { t.Run(test.name, func(t *testing.T) { ms := metricsStore{} + ms.AddShortIDs(&fnpb.MonitoringInfosMetadataResponse{ + MonitoringInfo: test.shortIDs, + }) + for _, payload := range test.input { - resp := &fnpb.ProcessBundleResponse{ - MonitoringInfos: payload, - } - ms.ContributeMetrics(resp) + ms.ContributeFinalMetrics(&fnpb.ProcessBundleResponse{ + MonitoringData: payload, + }) } got := ms.Results(committed) diff --git a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go index 2e96651bfe9..a234d6470a4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go @@ -93,7 +93,7 @@ func TestSeparation(t *testing.T) { out := beam.ParDo(s, &sepHarnessSdfStream{ Base: sepHarnessBase{ WatcherID: ws.newWatcher(3), - Sleep: time.Second, + Sleep: 10 * time.Millisecond, IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)}, LocalService: ws.serviceAddress, }, @@ -107,7 +107,7 @@ func TestSeparation(t *testing.T) { count := 10 imp := beam.Impulse(s) out := beam.ParDo(s, &singleStepSdfStream{ - Sleep: time.Second, + Sleep: 10 * time.Millisecond, RestSize: int64(count), }, imp) passert.Count(s, out, "global stepped num ints", count) @@ -121,7 +121,7 @@ func TestSeparation(t *testing.T) { count := int(elms / mod) imp := beam.Impulse(s) out := beam.ParDo(s, &eventtimeSDFStream{ - Sleep: time.Second, + Sleep: 10 * time.Millisecond, RestSize: int64(elms), Mod: int64(mod), Fixed: 1, @@ -135,12 +135,42 @@ func TestSeparation(t *testing.T) { gsum := beam.WindowInto(s, window.NewGlobalWindows(), sum) passert.Count(s, gsum, "total sums", count) }, + }, { + name: "ChannelSplit", + pipeline: func(s beam.Scope) { + count := 10 + imp := beam.Impulse(s) + ints := beam.ParDo(s, emitTenFn, imp) + out := beam.ParDo(s, &sepHarness{ + Base: sepHarnessBase{ + WatcherID: ws.newWatcher(3), + Sleep: 100 * time.Millisecond, + IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(threeSentinel)}, + LocalService: ws.serviceAddress, + }, + }, ints) + out = beam.ParDo(s, toInt, out) + passert.Sum(s, out, "sum ints", count, 55) + }, + }, { + name: "SDFSplit_adjacent_positions", + pipeline: func(s beam.Scope) { + count := 10 + imp := beam.Impulse(s) + out := beam.ParDo(s, &sepHarnessSdf{ + Base: sepHarnessBase{ + WatcherID: ws.newWatcher(3), + Sleep: 100 * time.Millisecond, + IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(closeSentinel)}, + LocalService: ws.serviceAddress, + }, + RestSize: int64(count), + }, imp) + passert.Count(s, out, "total elements", count) + }, }, } - // TODO: Channel Splits - // TODO: SubElement/dynamic splits. - for _, test := range tests { t.Run(test.name, func(t *testing.T) { p, s := beam.NewPipelineWithRoot() @@ -156,8 +186,27 @@ func TestSeparation(t *testing.T) { } } +func init() { + register.Function2x0(emitTenFn) + register.Function2x0(toInt) + register.Emitter1[int64]() + register.Emitter1[int]() +} + +func emitTenFn(_ []byte, emit func(int64)) { + for i := int64(1); i <= 10; i++ { + emit(i) + } +} + +func toInt(v int64, emit func(int)) { + emit(int(v)) +} + func init() { register.Function1x1(allSentinel) + register.Function1x1(threeSentinel) + register.Function2x1(closeSentinel) } // allSentinel indicates that all elements are sentinels. @@ -165,6 +214,22 @@ func allSentinel(v beam.T) bool { return true } +// threeSentinel indicates that every element that's mod 3 == 0 is a sentinel. +func threeSentinel(v beam.T) bool { + i := v.(int64) + return i > 0 && i%3 == 0 +} + +// closeSentinel indicates adjacent positions 3,4,5 are sentinels. +func closeSentinel(i int64, _ beam.T) bool { + switch i { + case 3, 4, 5: + return true + default: + return false + } +} + // Watcher is an instance of the counters. type watcher struct { id int @@ -304,7 +369,7 @@ func (fn *sepHarnessBase) setup() error { // Check if there's already a local channel for this id, and if not // start a watcher goroutine to poll and unblock the harness when // the expected number of sentinels is reached. - if _, ok := sepWaitMap[fn.WatcherID]; !ok { + if _, ok := sepWaitMap[fn.WatcherID]; ok { return nil } // We need a channel to block on for this watcherID diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 7be62611d42..c5a10cae7b4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" @@ -63,16 +64,22 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, comps *pipepb.Componen slog.Debug("Execute: starting bundle", "bundle", rb, slog.String("tid", tid)) var b *worker.B - var send bool inputData := em.InputForBundle(rb, s.inputInfo) + var dataReady <-chan struct{} switch s.envID { case "": // Runner Transforms // Runner transforms are processed immeadiately. b = s.exe.ExecuteTransform(tid, comps.GetTransforms()[tid], comps, rb.Watermark, inputData) b.InstID = rb.BundleID slog.Debug("Execute: runner transform", "bundle", rb, slog.String("tid", tid)) + + // Do some accounting for the fake bundle. + b.Resp = make(chan *fnpb.ProcessBundleResponse, 1) + close(b.Resp) // To + closed := make(chan struct{}) + close(closed) + dataReady = closed case wk.ID: - send = true b = &worker.B{ PBDID: s.ID, InstID: rb.BundleID, @@ -88,25 +95,78 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, comps *pipepb.Componen b.Init() s.prepareSides(b, s.transforms[0], rb.Watermark) + + slog.Debug("Execute: processing", "bundle", rb) + defer b.Cleanup(wk) + dataReady = b.ProcessOn(wk) default: err := fmt.Errorf("unknown environment[%v]", s.envID) slog.Error("Execute", err) panic(err) } - if send { - slog.Debug("Execute: processing", "bundle", rb) - b.ProcessOn(wk) // Blocks until finished. + // Progress + split loop. + previousIndex := int64(-2) + var splitsDone bool + progTick := time.NewTicker(100 * time.Millisecond) +progress: + for { + select { + case <-dataReady: + progTick.Stop() + break progress // exit progress loop on close. + case <-progTick.C: + resp := b.Progress(wk) + index, unknownIDs := j.ContributeTentativeMetrics(resp) + if len(unknownIDs) > 0 { + md := wk.MonitoringMetadata(unknownIDs) + j.AddMetricShortIDs(md) + } + slog.Debug("progress report", "bundle", rb, "index", index) + // Progress for the bundle hasn't advanced. Try splitting. + if previousIndex == index && !splitsDone { + sr := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) + if sr.GetChannelSplits() == nil { + slog.Warn("split failed", "bundle", rb) + splitsDone = true + continue progress + } + // TODO sort out rescheduling primary Roots on bundle failure. + var residualData [][]byte + for _, rr := range sr.GetResidualRoots() { + ba := rr.GetApplication() + residualData = append(residualData, ba.GetElement()) + if len(ba.GetElement()) == 0 { + slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) + panic("sdk returned empty residual application") + } + // TODO what happens to output watermarks on splits? + } + if len(sr.GetChannelSplits()) != 1 { + slog.Warn("received non-single channel split", "bundle", rb) + } + cs := sr.GetChannelSplits()[0] + fr := cs.GetFirstResidualElement() + // The first residual can be after the end of data, so filter out those cases. + if len(b.InputData) >= int(fr) { + b.InputData = b.InputData[:int(fr)] + em.ReturnResiduals(rb, int(fr), s.inputInfo, residualData) + } + } else { + previousIndex = index + } + } } // Tentative Data is ready, commit it to the main datastore. slog.Debug("Execute: commiting data", "bundle", rb, slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", maps.Keys(s.OutputsToCoders))) - resp := &fnpb.ProcessBundleResponse{} - if send { - resp = <-b.Resp - // Tally metrics immeadiately so they're available before - // pipeline termination. - j.ContributeMetrics(resp) + resp := <-b.Resp + // Tally metrics immeadiately so they're available before + // pipeline termination. + unknownIDs := j.ContributeFinalMetrics(resp) + if len(unknownIDs) > 0 { + md := wk.MonitoringMetadata(unknownIDs) + j.AddMetricShortIDs(md) } // TODO handle side input data properly. wk.D.Commit(b.OutputData) diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go index 4aa07a46c6f..6e7a9d27aee 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go @@ -115,7 +115,7 @@ func dofn3x1(sum int64, iter1, iter2 func(*int64) bool, emit func(int64)) { emit(sum) } -// int64Check validates that within a single bundle, +// int64Check validates that within a single bundle, for each window, // we received the expected int64 values & sends them downstream. // // Invalid pattern for general testing, as it will fail @@ -123,20 +123,28 @@ func dofn3x1(sum int64, iter1, iter2 func(*int64) bool, emit func(int64)) { type int64Check struct { Name string Want []int - got []int + got map[beam.Window][]int } -func (fn *int64Check) ProcessElement(v int64, _ func(int64)) { - fn.got = append(fn.got, int(v)) +func (fn *int64Check) StartBundle(_ func(int64)) error { + fn.got = map[beam.Window][]int{} + return nil +} + +func (fn *int64Check) ProcessElement(w beam.Window, v int64, _ func(int64)) { + fn.got[w] = append(fn.got[w], int(v)) } func (fn *int64Check) FinishBundle(_ func(int64)) error { - sort.Ints(fn.got) sort.Ints(fn.Want) - if d := cmp.Diff(fn.Want, fn.got); d != "" { - return fmt.Errorf("int64Check[%v] (-want, +got): %v", fn.Name, d) + // Check for each window individually. + for _, vs := range fn.got { + sort.Ints(vs) + if d := cmp.Diff(fn.Want, vs); d != "" { + return fmt.Errorf("int64Check[%v] (-want, +got): %v", fn.Name, d) + } + // Clear for subsequent calls. } - // Clear for subsequent calls. fn.got = nil return nil } diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 3596c40f0dc..5ca4eb9fd4c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -16,6 +16,7 @@ package internal import ( + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" ) @@ -40,7 +41,7 @@ func init() { register.Function2x0(dofnKV2) register.Function3x0(dofnGBK) register.Function3x0(dofnGBK2) - register.DoFn2x0[int64, func(int64)]((*int64Check)(nil)) + register.DoFn3x0[beam.Window, int64, func(int64)]((*int64Check)(nil)) register.DoFn2x0[string, func(string)]((*stringCheck)(nil)) register.Function2x0(dofnKV3) register.Function3x0(dofnGBK3) diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index 035ab3c0727..40b15ee5d05 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -42,14 +42,15 @@ func quickUrn[Enum protoEnum](v Enum) string { } var ( - ptUrn = toUrn[pipepb.StandardPTransforms_Primitives]() - ctUrn = toUrn[pipepb.StandardPTransforms_Composites]() - cmbtUrn = toUrn[pipepb.StandardPTransforms_CombineComponents]() - sdfUrn = toUrn[pipepb.StandardPTransforms_SplittableParDoComponents]() - siUrn = toUrn[pipepb.StandardSideInputTypes_Enum]() - cdrUrn = toUrn[pipepb.StandardCoders_Enum]() - reqUrn = toUrn[pipepb.StandardRequirements_Enum]() - envUrn = toUrn[pipepb.StandardEnvironments_Environments]() + ptUrn = toUrn[pipepb.StandardPTransforms_Primitives]() + ctUrn = toUrn[pipepb.StandardPTransforms_Composites]() + cmbtUrn = toUrn[pipepb.StandardPTransforms_CombineComponents]() + sdfUrn = toUrn[pipepb.StandardPTransforms_SplittableParDoComponents]() + siUrn = toUrn[pipepb.StandardSideInputTypes_Enum]() + cdrUrn = toUrn[pipepb.StandardCoders_Enum]() + reqUrn = toUrn[pipepb.StandardRequirements_Enum]() + runProcUrn = toUrn[pipepb.StandardRunnerProtocols_Enum]() + envUrn = toUrn[pipepb.StandardEnvironments_Environments]() ) var ( @@ -120,6 +121,10 @@ var ( RequirementStatefulProcessing = reqUrn(pipepb.StandardRequirements_REQUIRES_STATEFUL_PROCESSING) RequirementTimeSortedInput = reqUrn(pipepb.StandardRequirements_REQUIRES_TIME_SORTED_INPUT) + // Capabilities + CapabilityMonitoringInfoShortIDs = runProcUrn(pipepb.StandardRunnerProtocols_MONITORING_INFO_SHORT_IDS) + CapabilityControlResponseElementsEmbedding = runProcUrn(pipepb.StandardRunnerProtocols_CONTROL_RESPONSE_ELEMENTS_EMBEDDING) + // Environment types EnvDocker = envUrn(pipepb.StandardEnvironments_DOCKER) EnvProcess = envUrn(pipepb.StandardEnvironments_PROCESS) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index f6fbf1293f4..77f7094a97e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -16,7 +16,7 @@ package worker import ( - "sync" + "sync/atomic" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -40,19 +40,22 @@ type B struct { // MultiMapSideInputData is a map from transformID, to inputID, to window, to data key, to data values. MultiMapSideInputData map[string]map[string]map[typex.Window]map[string][][]byte - // OutputCount is the number of data outputs this bundle has. + // OutputCount is the number of data or timer outputs this bundle has. // We need to see this many closed data channels before the bundle is complete. OutputCount int - // dataWait is how we determine if a bundle is finished, by waiting for each of + // DataWait is how we determine if a bundle is finished, by waiting for each of // a Bundle's DataSinks to produce their last output. // After this point we can "commit" the bundle's output for downstream use. - dataWait sync.WaitGroup + DataWait chan struct{} + dataSema atomic.Int32 OutputData engine.TentativeData - Resp chan *fnpb.ProcessBundleResponse - SinkToPCollection map[string]string + // TODO move response channel to an atomic and an additional + // block on the DataWait channel, to allow progress & splits for + // no output DoFns. + Resp chan *fnpb.ProcessBundleResponse - // TODO: Metrics for this bundle, can be handled after the fact. + SinkToPCollection map[string]string } // Init initializes the bundle's internal state for waiting on all @@ -60,27 +63,43 @@ type B struct { func (b *B) Init() { // We need to see final data signals that match the number of // outputs the stage this bundle executes posesses - b.dataWait.Add(b.OutputCount) + b.dataSema.Store(int32(b.OutputCount)) + b.DataWait = make(chan struct{}) + if b.OutputCount == 0 { + close(b.DataWait) // Can happen if there are no outputs for the bundle. + } b.Resp = make(chan *fnpb.ProcessBundleResponse, 1) } +// DataDone indicates a final element has been received from a Data or Timer output. +func (b *B) DataDone() { + sema := b.dataSema.Add(-1) + if sema == 0 { + close(b.DataWait) + } +} + func (b *B) LogValue() slog.Value { return slog.GroupValue( slog.String("ID", b.InstID), slog.String("stage", b.PBDID)) } -// ProcessOn executes the given bundle on the given W, blocking -// until all data is complete. +func (b *B) Respond(resp *fnpb.InstructionResponse) { + b.Resp <- resp.GetProcessBundle() +} + +// ProcessOn executes the given bundle on the given W. +// The returned channel is closed once all expected data is returned. // // Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set, response channel initialized) // Assumes the bundle descriptor is already registered with the W. // // While this method mostly manipulates a W, putting it on a B avoids mixing the workers // public GRPC APIs up with local calls. -func (b *B) ProcessOn(wk *W) { +func (b *B) ProcessOn(wk *W) <-chan struct{} { wk.mu.Lock() - wk.bundles[b.InstID] = b + wk.activeInstructions[b.InstID] = b wk.mu.Unlock() slog.Debug("processing", "bundle", b, "worker", wk) @@ -108,7 +127,39 @@ func (b *B) ProcessOn(wk *W) { }, } } + return b.DataWait +} + +// Cleanup unregisters the bundle from the worker. +func (b *B) Cleanup(wk *W) { + wk.mu.Lock() + delete(wk.activeInstructions, b.InstID) + wk.mu.Unlock() +} + +func (b *B) Progress(wk *W) *fnpb.ProcessBundleProgressResponse { + return wk.sendInstruction(&fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_ProcessBundleProgress{ + ProcessBundleProgress: &fnpb.ProcessBundleProgressRequest{ + InstructionId: b.InstID, + }, + }, + }).GetProcessBundleProgress() +} - slog.Debug("waiting on data", "bundle", b) - b.dataWait.Wait() // Wait until data is ready. +func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessBundleSplitResponse { + return wk.sendInstruction(&fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_ProcessBundleSplit{ + ProcessBundleSplit: &fnpb.ProcessBundleSplitRequest{ + InstructionId: b.InstID, + DesiredSplits: map[string]*fnpb.ProcessBundleSplitRequest_DesiredSplit{ + b.InputTransformID: { + FractionOfRemainder: fraction, + AllowedSplitPoints: allowedSplits, + EstimatedInputElements: int64(len(b.InputData)), + }, + }, + }, + }, + }).GetProcessBundleSplit() } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go index 154306c3f6b..bfef6873426 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go @@ -36,7 +36,7 @@ func TestBundle_ProcessOn(t *testing.T) { b.ProcessOn(wk) completed.Done() }() - b.dataWait.Done() + b.DataDone() gotData := <-wk.DataReqs if got, want := gotData.GetData()[0].GetData(), []byte{1, 2, 3}; !bytes.EqualFold(got, want) { t.Errorf("ProcessOn(): data not sent; got %v, want %v", got, want) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 324cc83154a..04118f14735 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -26,12 +26,15 @@ import ( "sync" "sync/atomic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "golang.org/x/exp/slog" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -47,6 +50,7 @@ type W struct { fnpb.UnimplementedBeamFnDataServer fnpb.UnimplementedBeamFnStateServer fnpb.UnimplementedBeamFnLoggingServer + fnpb.UnimplementedProvisionServiceServer ID string @@ -60,13 +64,17 @@ type W struct { InstReqs chan *fnpb.InstructionRequest DataReqs chan *fnpb.Elements - mu sync.Mutex - bundles map[string]*B // Bundles keyed by InstructionID - Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID + mu sync.Mutex + activeInstructions map[string]controlResponder // Active instructions keyed by InstructionID + Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID D *DataService } +type controlResponder interface { + Respond(*fnpb.InstructionResponse) +} + // New starts the worker server components of FnAPI Execution. func New(id string) *W { lis, err := net.Listen("tcp", ":0") @@ -82,8 +90,8 @@ func New(id string) *W { InstReqs: make(chan *fnpb.InstructionRequest, 10), DataReqs: make(chan *fnpb.Elements, 10), - bundles: make(map[string]*B), - Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor), + activeInstructions: make(map[string]controlResponder), + Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor), D: &DataService{}, } @@ -136,6 +144,32 @@ func (wk *W) NextStage() string { // TODO set logging level. var minsev = fnpb.LogEntry_Severity_DEBUG +func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) { + endpoint := &pipepb.ApiServiceDescriptor{ + Url: wk.Endpoint(), + } + resp := &fnpb.GetProvisionInfoResponse{ + Info: &fnpb.ProvisionInfo{ + // TODO: Add the job's Pipeline options + // TODO: Include runner capabilities with the per job configuration. + RunnerCapabilities: []string{ + urns.CapabilityMonitoringInfoShortIDs, + }, + LoggingEndpoint: endpoint, + ControlEndpoint: endpoint, + ArtifactEndpoint: endpoint, + // TODO add this job's RetrievalToken + // TODO add this job's artifact Dependencies + + Metadata: map[string]string{ + "runner": "prism", + "runner_version": core.SdkVersion, + }, + }, + } + return resp, nil +} + // Logging relates SDK worker messages back to the job that spawned them. // Messages are received from the SDK, func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { @@ -217,14 +251,14 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { // TODO: Do more than assume these are ProcessBundleResponses. wk.mu.Lock() - if b, ok := wk.bundles[resp.GetInstructionId()]; ok { + if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok { // TODO. Better pipeline error handling. if resp.Error != "" { slog.LogAttrs(context.TODO(), slog.LevelError, "ctrl.Recv pipeline error", slog.String("error", resp.GetError())) panic(resp.GetError()) } - b.Resp <- resp.GetProcessBundle() + b.Respond(resp) } else { slog.Debug("ctrl.Recv: %v", resp) } @@ -264,11 +298,13 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { } wk.mu.Lock() for _, d := range resp.GetData() { - b, ok := wk.bundles[d.GetInstructionId()] + cr, ok := wk.activeInstructions[d.GetInstructionId()] if !ok { slog.Info("data.Recv for unknown bundle", "response", resp) continue } + // Received data is always for an active ProcessBundle instruction + b := cr.(*B) colID := b.SinkToPCollection[d.GetTransformId()] // There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for @@ -277,7 +313,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { b.OutputData.WriteData(colID, d.GetData()) } if d.GetIsLast() { - b.dataWait.Done() + b.DataDone() } } wk.mu.Unlock() @@ -320,7 +356,11 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { switch req.GetRequest().(type) { case *fnpb.StateRequest_Get: // TODO: move data handling to be pcollection based. - b := wk.bundles[req.GetInstructionId()] + + // State requests are always for an active ProcessBundle instruction + wk.mu.Lock() + b := wk.activeInstructions[req.GetInstructionId()].(*B) + wk.mu.Unlock() key := req.GetStateKey() slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b) @@ -399,15 +439,67 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { return nil } +var chanResponderPool = sync.Pool{ + New: func() any { + return &chanResponder{make(chan *fnpb.InstructionResponse, 1)} + }, +} + +type chanResponder struct { + Resp chan *fnpb.InstructionResponse +} + +func (cr *chanResponder) Respond(resp *fnpb.InstructionResponse) { + cr.Resp <- resp +} + +// sendInstruction is a helper for creating and sending worker single RPCs, blocking +// until the response returns. +func (wk *W) sendInstruction(req *fnpb.InstructionRequest) *fnpb.InstructionResponse { + cr := chanResponderPool.Get().(*chanResponder) + progInst := wk.NextInst() + wk.mu.Lock() + wk.activeInstructions[progInst] = cr + wk.mu.Unlock() + + defer func() { + wk.mu.Lock() + delete(wk.activeInstructions, progInst) + wk.mu.Unlock() + chanResponderPool.Put(cr) + }() + + req.InstructionId = progInst + + // Tell the SDK to start processing the bundle. + wk.InstReqs <- req + // Protos are safe as nil, so just return directly. + return <-cr.Resp +} + +// MonitoringMetadata is a convenience method to request the metadata for monitoring shortIDs. +func (wk *W) MonitoringMetadata(unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse { + return wk.sendInstruction(&fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_MonitoringInfos{ + MonitoringInfos: &fnpb.MonitoringInfosMetadataRequest{ + MonitoringInfoId: unknownIDs, + }, + }, + }).GetMonitoringInfos() +} + // DataService is slated to be deleted in favour of stage based state // management for side inputs. type DataService struct { + mu sync.Mutex // TODO actually quick process the data to windows here as well. raw map[string][][]byte } // Commit tentative data to the datastore. func (d *DataService) Commit(tent engine.TentativeData) { + d.mu.Lock() + defer d.mu.Unlock() if d.raw == nil { d.raw = map[string][][]byte{} } @@ -418,5 +510,7 @@ func (d *DataService) Commit(tent engine.TentativeData) { // GetAllData is a hack for Side Inputs until watermarks are sorted out. func (d *DataService) GetAllData(colID string) [][]byte { + d.mu.Lock() + defer d.mu.Unlock() return d.raw[colID] } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go index 29b3fab92d6..14b879fe242 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go @@ -145,7 +145,7 @@ func TestWorker_Control_HappyPath(t *testing.T) { b := &B{} b.Init() - wk.bundles[instID] = b + wk.activeInstructions[instID] = b b.ProcessOn(wk) ctrlStream.Send(&fnpb.InstructionResponse{ @@ -187,7 +187,7 @@ func TestWorker_Data_HappyPath(t *testing.T) { OutputCount: 1, } b.Init() - wk.bundles[instID] = b + wk.activeInstructions[instID] = b var wg sync.WaitGroup wg.Add(1) @@ -235,7 +235,7 @@ func TestWorker_State_Iterable(t *testing.T) { } instID := wk.NextInst() - wk.bundles[instID] = &B{ + wk.activeInstructions[instID] = &B{ IterableSideInputData: map[string]map[string]map[typex.Window][][]byte{ "transformID": { "i1": {