This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0af670d4616 [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554) 0af670d4616 is described below commit 0af670d46168c5cb182fed1f7e56d3fde1841cd1 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Thu May 5 10:02:22 2022 -0700 [BEAM-14173] Fix Go Loadtests on Dataflow & partial fix for Flink (#17554) --- .test-infra/jenkins/README.md | 2 +- .../jenkins/job_LoadTests_Combine_Go.groovy | 8 ++++++++ .test-infra/jenkins/job_LoadTests_GBK_Go.groovy | 16 ++++++++++++++++ .test-infra/jenkins/job_LoadTests_ParDo_Go.groovy | 10 ++++++++++ .../jenkins/job_LoadTests_SideInput_Go.groovy | 6 ++++++ .test-infra/jenkins/job_LoadTests_coGBK_Go.groovy | 10 ++++++++++ CHANGES.md | 7 +++---- sdks/go/pkg/beam/io/synthetic/source.go | 22 +++++++++++----------- sdks/go/test/load/sideinput/sideinput.go | 9 ++++----- 9 files changed, 69 insertions(+), 21 deletions(-) diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md index 131d2dde0f5..02a616950f9 100644 --- a/.test-infra/jenkins/README.md +++ b/.test-infra/jenkins/README.md @@ -166,7 +166,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/) | beam_LoadTests_Go_GBK_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch_PR/) | `Run Load Tests Go GBK Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_GBK_Flink_Batch/) | | beam_LoadTests_Go_ParDo_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch_PR/) | `Run Load Tests Go ParDo Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Dataflow_Batch/) | | beam_LoadTests_Go_ParDo_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch_PR/) | `Run Load Tests Go ParDo Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_ParDo_Flink_Batch/) | -| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch suite` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) | +| beam_LoadTests_Go_SideInput_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch_PR/) | `Run Load Tests Go SideInput Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Dataflow_Batch/) | | beam_LoadTests_Go_SideInput_Flink_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch_PR/) | `Run Load Tests Go SideInput Flink Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Go_SideInput_Flink_Batch/) | | beam_Java_LoadTests_Smoke | [phrase](https://ci-beam.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | `Run Java Load Tests Smoke` | | | beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Java CoGBK Dataflow Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) | diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy index d1204a50f48..7e4ca7b284e 100644 --- a/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_Combine_Go.groovy @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder import PhraseTriggeringPostCommitBuilder import InfluxDBCredentialsHelper +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) @@ -47,6 +49,8 @@ def batchScenarios = { top_count : 20, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -69,6 +73,8 @@ def batchScenarios = { top_count : 20, num_workers : 16, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -91,6 +97,8 @@ def batchScenarios = { top_count : 20, num_workers : 16, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], ].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) } diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy index 8e3f26c5cda..559baf6c899 100644 --- a/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_GBK_Go.groovy @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder import PhraseTriggeringPostCommitBuilder import InfluxDBCredentialsHelper +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) def batchScenarios = { @@ -46,6 +48,8 @@ def batchScenarios = { fanout : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -68,6 +72,8 @@ def batchScenarios = { fanout : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -90,6 +96,8 @@ def batchScenarios = { fanout : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -112,6 +120,8 @@ def batchScenarios = { fanout : 4, num_workers : 16, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -134,6 +144,8 @@ def batchScenarios = { fanout : 8, num_workers : 16, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -158,6 +170,8 @@ def batchScenarios = { fanout : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -182,6 +196,8 @@ def batchScenarios = { fanout : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], ] diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy index a03b9bb0b08..f1f6b40be74 100644 --- a/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_ParDo_Go.groovy @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder import PhraseTriggeringPostCommitBuilder import InfluxDBCredentialsHelper +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC')) @@ -48,6 +50,8 @@ def batchScenarios = { number_of_counters : 0, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -71,6 +75,8 @@ def batchScenarios = { number_of_counters : 0, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -94,6 +100,8 @@ def batchScenarios = { number_of_counters : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -117,6 +125,8 @@ def batchScenarios = { number_of_counters : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], ].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) } diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy index 16e8618b430..225bbc79998 100644 --- a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder import PhraseTriggeringPostCommitBuilder import InfluxDBCredentialsHelper +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) def batchScenarios = { @@ -45,6 +47,8 @@ def batchScenarios = { access_percentage: 1, num_workers : 10, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -65,6 +69,8 @@ def batchScenarios = { '"value_size": 900}\'', num_workers : 10, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ] ] diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy index 09072b6197d..6218fa416d8 100644 --- a/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_coGBK_Go.groovy @@ -22,6 +22,8 @@ import LoadTestsBuilder as loadTestsBuilder import PhraseTriggeringPostCommitBuilder import InfluxDBCredentialsHelper +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC')) def batchScenarios = { @@ -53,6 +55,8 @@ def batchScenarios = { iterations : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -82,6 +86,8 @@ def batchScenarios = { iterations : 1, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -111,6 +117,8 @@ def batchScenarios = { iterations : 4, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], [ @@ -140,6 +148,8 @@ def batchScenarios = { iterations : 4, num_workers : 5, autoscaling_algorithm: 'NONE', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", ] ], ] diff --git a/CHANGES.md b/CHANGES.md index 5f7aea3f86b..605ca0e87ab 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -44,12 +44,11 @@ ## Bugfixes * Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). -* Fixed Java expansion service to allow specific files to stage ([BEAM-14160](https://issues.apache.org/jira/browse/BEAM-14160)). - ## Known Issues * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). --> + # [2.40.0] - Unreleased ## Highlights @@ -68,6 +67,7 @@ ## Breaking Changes * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173)) ## Deprecations @@ -82,8 +82,7 @@ * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). - -# [2.39.0] - Unreleased +# [2.39.0] - Unreleased, Cut ## Highlights diff --git a/sdks/go/pkg/beam/io/synthetic/source.go b/sdks/go/pkg/beam/io/synthetic/source.go index 75304091d27..822c416f696 100644 --- a/sdks/go/pkg/beam/io/synthetic/source.go +++ b/sdks/go/pkg/beam/io/synthetic/source.go @@ -191,7 +191,7 @@ func DefaultSourceConfig() *SourceConfigBuilder { // Valid values are in the range of [1, ...] and the default value is 1. Values // of 0 (and below) are invalid as they result in sources that emit no elements. func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder { - b.cfg.NumElements = val + b.cfg.NumElements = int64(val) return b } @@ -210,7 +210,7 @@ func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder { // of 0 (and below) are invalid as they would result in dropping elements that // are expected to be emitted. func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder { - b.cfg.InitialSplits = val + b.cfg.InitialSplits = int64(val) return b } @@ -219,7 +219,7 @@ func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder { // // Valid values are in the range of [1, ...] and the default value is 8. func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder { - b.cfg.KeySize = val + b.cfg.KeySize = int64(val) return b } @@ -228,7 +228,7 @@ func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder { // // Valid values are in the range of [1, ...] and the default value is 8. func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder { - b.cfg.ValueSize = val + b.cfg.ValueSize = int64(val) return b } @@ -237,7 +237,7 @@ func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder { // // Valid values are in the range of [0, ...] and the default value is 0. func (b *SourceConfigBuilder) NumHotKeys(val int) *SourceConfigBuilder { - b.cfg.NumHotKeys = val + b.cfg.NumHotKeys = int64(val) return b } @@ -299,10 +299,10 @@ func (b *SourceConfigBuilder) BuildFromJSON(jsonData []byte) SourceConfig { // synthetic source. It should be created via a SourceConfigBuilder, not by // directly initializing it (the fields are public to allow encoding). type SourceConfig struct { - NumElements int `json:"num_records"` - InitialSplits int `json:"initial_splits"` - KeySize int `json:"key_size"` - ValueSize int `json:"value_size"` - NumHotKeys int `json:"num_hot_keys"` - HotKeyFraction float64 `json:"hot_key_fraction"` + NumElements int64 `json:"num_records" beam:"num_records"` + InitialSplits int64 `json:"initial_splits" beam:"initial_splits"` + KeySize int64 `json:"key_size" beam:"key_size"` + ValueSize int64 `json:"value_size" beam:"value_size"` + NumHotKeys int64 `json:"num_hot_keys" beam:"num_hot_keys"` + HotKeyFraction float64 `json:"hot_key_fraction" beam:"hot_key_fraction"` } diff --git a/sdks/go/test/load/sideinput/sideinput.go b/sdks/go/test/load/sideinput/sideinput.go index fe4f4527075..6f7cb6f2d41 100644 --- a/sdks/go/test/load/sideinput/sideinput.go +++ b/sdks/go/test/load/sideinput/sideinput.go @@ -52,13 +52,12 @@ func parseSyntheticConfig() synthetic.SourceConfig { } type doFn struct { - ElementsToAccess int + ElementsToAccess int64 } func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) { - var key []byte - var value []byte - i := 0 + var key, value []byte + var i int64 for values(&key, &value) { if i >= fn.ElementsToAccess { break @@ -75,7 +74,7 @@ func main() { p, s := beam.NewPipelineWithRoot() syntheticConfig := parseSyntheticConfig() - elementsToAccess := syntheticConfig.NumElements * *accessPercentage / 100 + elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100) src := synthetic.SourceSingle(s, syntheticConfig) src = beam.ParDo(s, &load.RuntimeMonitor{}, src)