Re: [PR] start to refactor persistence layer to prepare for direct path [beam]
m-trieu commented on code in PR #30265: URL: https://github.com/apache/beam/pull/30265#discussion_r1501363890 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -462,42 +446,89 @@ public void run() { LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes); } - private static WindmillServerStub createWindmillServerStub( + public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) { +ConcurrentMap computationMap = new ConcurrentHashMap<>(); +long clientId = clientIdGenerator.nextLong(); +return new StreamingDataflowWorker( +createWindmillServerStub( +options, +clientId, +new WorkHeartbeatResponseProcessor( Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Wire error handling into PubSubIO and add initial tests [beam]
Naireen commented on code in PR #30372: URL: https://github.com/apache/beam/pull/30372#discussion_r1501342511 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java: ## @@ -125,18 +140,42 @@ public void process( @Timestamp Instant ts, BoundedWindow window, PaneInfo paneInfo, - OutputReceiver o) { + MultiOutputReceiver o) + throws Exception { ValueInSingleWindow valueInSingleWindow = ValueInSingleWindow.of(element, ts, window, paneInfo); -PubsubMessage message = formatFunction.apply(valueInSingleWindow); +PubsubMessage message; +try { + message = formatFunction.apply(valueInSingleWindow); +} catch (Exception e) { + badRecordRouter.route( + o, + element, + inputCoder, + e, + "Failed to serialize PubSub message with provided format function"); + return; +} if (topicFunction != null) { - message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath()); + try { +message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath()); + } catch (Exception e) { +badRecordRouter.route( +o, element, inputCoder, e, "Failed to determine PubSub topic using topic function"); Review Comment: Should we just add valueInSingleWindow into the error message here so its easier to see what went wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Wire error handling into PubSubIO and add initial tests [beam]
Naireen commented on code in PR #30372: URL: https://github.com/apache/beam/pull/30372#discussion_r1501341983 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ## @@ -1089,57 +1130,73 @@ public PCollection expand(PBegin input) { getNeedsMessageId(), getNeedsOrderingKey()); - PCollection read; PCollection preParse = input.apply(source); TypeDescriptor typeDescriptor = new TypeDescriptor() {}; - if (getDeadLetterTopicProvider() == null) { + PCollection read; + if (getDeadLetterTopicProvider() == null + && (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn())); } else { +// parse PubSub messages, separating out execptions Review Comment: nit: exceptions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: Pytest fails on a pipeline when importing from apache_beam.io.gcp.bigquery [beam]
github-actions[bot] closed issue #30392: [Bug]: Pytest fails on a pipeline when importing from apache_beam.io.gcp.bigquery URL: https://github.com/apache/beam/issues/30392 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: Pytest fails on a pipeline when importing from apache_beam.io.gcp.bigquery [beam]
mitchej123 commented on issue #30392: URL: https://github.com/apache/beam/issues/30392#issuecomment-1962227841 .close-issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix invalid escape sequence '\#' [beam]
tvalentyn merged PR #30393: URL: https://github.com/apache/beam/pull/30393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix invalid escape sequence '\#' [beam]
tvalentyn closed pull request #30393: Fix invalid escape sequence '\#' URL: https://github.com/apache/beam/pull/30393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] start to refactor persistence layer to prepare for direct path [beam]
m-trieu commented on code in PR #30265: URL: https://github.com/apache/beam/pull/30265#discussion_r1501305913 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -462,42 +446,89 @@ public void run() { LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes); } - private static WindmillServerStub createWindmillServerStub( + public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) { +ConcurrentMap computationMap = new ConcurrentHashMap<>(); +long clientId = clientIdGenerator.nextLong(); +return new StreamingDataflowWorker( +createWindmillServerStub( +options, +clientId, +new WorkHeartbeatResponseProcessor( Review Comment: figured it out looks like the work isn't being populated in the computation map will fix it thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] start to refactor persistence layer to prepare for direct path [beam]
m-trieu commented on code in PR #30265: URL: https://github.com/apache/beam/pull/30265#discussion_r1501297099 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -462,42 +446,89 @@ public void run() { LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes); } - private static WindmillServerStub createWindmillServerStub( + public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) { +ConcurrentMap computationMap = new ConcurrentHashMap<>(); +long clientId = clientIdGenerator.nextLong(); +return new StreamingDataflowWorker( +createWindmillServerStub( +options, +clientId, +new WorkHeartbeatResponseProcessor( Review Comment: it gets set up here in FakeWindmillServer ![image](https://github.com/apache/beam/assets/29557027/6701f28a-37d6-4a95-aa3d-6632e0d05fb9) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] start to refactor persistence layer to prepare for direct path [beam]
m-trieu commented on code in PR #30265: URL: https://github.com/apache/beam/pull/30265#discussion_r1501295248 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -462,42 +446,89 @@ public void run() { LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes); } - private static WindmillServerStub createWindmillServerStub( + public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) { +ConcurrentMap computationMap = new ConcurrentHashMap<>(); +long clientId = clientIdGenerator.nextLong(); +return new StreamingDataflowWorker( +createWindmillServerStub( +options, +clientId, +new WorkHeartbeatResponseProcessor( Review Comment: it gets set up here in FakeWindmillServer ![Uploading image.png…]() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Python] Vertex AI Feature Store enrichment handler [beam]
github-actions[bot] commented on PR #30388: URL: https://github.com/apache/beam/pull/30388#issuecomment-1962165882 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix invalid escape sequence '\#' [beam]
github-actions[bot] commented on PR #30393: URL: https://github.com/apache/beam/pull/30393#issuecomment-1962089721 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @tvalentyn for label python. R: @ahmedabu98 for label io. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
robertwb commented on code in PR #30403: URL: https://github.com/apache/beam/pull/30403#discussion_r1501236860 ## runners/core-java/build.gradle: ## @@ -46,6 +46,8 @@ dependencies { implementation library.java.joda_time implementation library.java.vendored_grpc_1_60_1 implementation library.java.slf4j_api + implementation library.java.jackson_core + implementation library.java.jackson_databind Review Comment: Ugh... @kennknowles would be in a better position to answer this. We could re-introduce runners-core-construction with this as the sole class if needed if we can't place it in runners-core itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: Beam uses a version of the org.json:json that has a Category X license [beam]
Abacn closed issue #30404: [Bug]: Beam uses a version of the org.json:json that has a Category X license URL: https://github.com/apache/beam/issues/30404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Performance Regression or Improvement: pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_latency_micro_secs:mean_inference_batch_latency_micro_secs [beam]
github-actions[bot] commented on issue #27986: URL: https://github.com/apache/beam/issues/27986#issuecomment-1962071521 Performance change found in the test: `pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_latency_micro_secs` for the metric: `mean_inference_batch_latency_micro_secs`. For more information on how to triage the alerts, please look at `Triage performance alert issues` section of the [README](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/testing/analyzers/README.md#triage-performance-alert-issues). `Test description:` Pytorch image classification on 50k images of size 224 x 224 with resnet 152 with Tesla T4 GPU. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L151). Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?from=now-90d=now=2 ``` timestamp: Fri Feb 23 06:53:41 2024, metric_value: 4560157.85 timestamp: Wed Feb 21 07:12:25 2024, metric_value: 4883101.24 timestamp: Tue Feb 20 06:53:46 2024, metric_value: 5398728.80 < Anomaly timestamp: Sun Feb 18 06:41:31 2024, metric_value: 3060535.64 timestamp: Sat Feb 17 06:52:20 2024, metric_value: 2565324.57 timestamp: Fri Feb 16 06:55:15 2024, metric_value: 3789397.17 timestamp: Thu Feb 15 06:58:53 2024, metric_value: 4431068.78 timestamp: Wed Feb 14 06:50:20 2024, metric_value: 3481961.73 timestamp: Mon Feb 12 06:42:22 2024, metric_value: 3489616.96 timestamp: Sat Feb 10 06:45:00 2024, metric_value: 3158261.25 timestamp: Fri Feb 9 06:47:02 2024, metric_value: 2655775.48 timestamp: Thu Feb 8 06:47:05 2024, metric_value: 2885917.85 timestamp: Wed Feb 7 06:52:04 2024, metric_value: 3010852.66 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]
Abacn merged PR #30406: URL: https://github.com/apache/beam/pull/30406 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
je-ik commented on code in PR #30403: URL: https://github.com/apache/beam/pull/30403#discussion_r1501221936 ## runners/core-java/build.gradle: ## @@ -46,6 +46,8 @@ dependencies { implementation library.java.joda_time implementation library.java.vendored_grpc_1_60_1 implementation library.java.slf4j_api + implementation library.java.jackson_core + implementation library.java.jackson_databind Review Comment: @robertwb this is unfortunate. Do we have a check for API surface that this does not leak anywhere? In old construction-java and in sdk-core this is shaded. I can shade it, but runners-core is currently "plain", so that could be argument against placing it into the runners-core. I'm not familar enough wth the details of the build system to make a decision myself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [yaml] Add Beam YAML Examples and Getting started docs [beam]
Polber commented on code in PR #30003: URL: https://github.com/apache/beam/pull/30003#discussion_r1501217877 ## sdks/python/apache_beam/yaml/examples/wordcount_minimal.yaml: ## @@ -0,0 +1,75 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the# Row(output='License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an# Row(output='AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This examples reads from a public file stores on Google Cloud. This +# requires authenticating with Google Cloud, or setting the file in +#`ReadFromText` to a local file. +# +# To set up Application Default Credentials, +# see https://cloud.google.com/docs/authentication/external/set-up-adc for more +# information +# +# This pipeline reads in a text file, maps all words to a value of "1", sums Review Comment: @robertwb I refactored the example a bit to make it follow the logic more semantically. It also outputs `Row(word=..., count=...)` instead of `Row(output="word: count")` Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update beam-2.54.0.md about v2 default. [beam]
robertwb merged PR #30411: URL: https://github.com/apache/beam/pull/30411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update beam-2.54.0.md about v2 default. [beam]
robertwb opened a new pull request, #30411: URL: https://github.com/apache/beam/pull/30411 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Ensure flatten windows match [beam]
damccorm commented on code in PR #30410: URL: https://github.com/apache/beam/pull/30410#discussion_r1501185496 ## sdks/python/apache_beam/transforms/core.py: ## @@ -3683,8 +3683,14 @@ def _extract_input_pvalues(self, pvalueish): return pvalueish, pvalueish def expand(self, pcolls): +windowing = self.get_windowing(pcolls) for pcoll in pcolls: self._check_pcollection(pcoll) + if pcoll.windowing != windowing: Review Comment: Yeah, I think that's a good idea. It turns out our test asserts actually do this already for some weird reasons, so I'm sure others do as well. That's what is causing CI to fail - https://github.com/apache/beam/blob/20675c860f46f3f4abce061a6b490166ca68df0f/sdks/python/apache_beam/testing/util.py#L286C7-L286C22 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]
riteshghorse commented on PR #30409: URL: https://github.com/apache/beam/pull/30409#issuecomment-1962011172 That makes sense. I think it will confuse users if the file exists even though PCollection was empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: beam.io.WriteToText deletes existing file even if skip_if_empty=True [beam]
riteshghorse commented on issue #27926: URL: https://github.com/apache/beam/issues/27926#issuecomment-1962010219 Oh yes, that makes sense. I didn't think of that. This could confuse users. Added a doc comment instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [yaml] Add Beam YAML Examples and Getting started docs [beam]
Polber commented on code in PR #30003: URL: https://github.com/apache/beam/pull/30003#discussion_r1501174577 ## sdks/python/apache_beam/yaml/examples/README.md: ## @@ -0,0 +1,48 @@ +# Examples Catalog + + +* [Examples Catalog](#examples-catalog) + * [Wordcount](#wordcount) + * [Transforms](#transforms) +* [Element-wise](#element-wise) +* [Aggregation](#aggregation) + + +This module contains a series of Beam YAML code samples that can be run using +the command: +``` +python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml +``` + +## Wordcount +A good starting place is the [Wordcount](wordcount_minimal.yaml) example under +the root example directory. +This example reads in a text file, splits the text on each word, groups by each +word, and counts the occurrence of each word. This is a classic example used in +the other SDK's and shows off many of the functionalities of Beam YAML. + +## Transforms + +Examples in this directory show off the various built-in transforms of the Beam +YAML framework. + +### Element-wise +These examples leverage the built-in mapping transforms including `MapToFields`, +`Filter` and `Explode`. More information can be found about mapping transforms +[here](../docs/yaml_mapping.md). Review Comment: I pointed to UDF section since that is where MapToFields lives -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Ensure flatten windows match [beam]
liferoad commented on code in PR #30410: URL: https://github.com/apache/beam/pull/30410#discussion_r1501172737 ## sdks/python/apache_beam/transforms/core.py: ## @@ -3683,8 +3683,14 @@ def _extract_input_pvalues(self, pvalueish): return pvalueish, pvalueish def expand(self, pcolls): +windowing = self.get_windowing(pcolls) for pcoll in pcolls: self._check_pcollection(pcoll) + if pcoll.windowing != windowing: Review Comment: Shall we just log the error since raising the error could break some users' existing jobs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]
damccorm commented on PR #30409: URL: https://github.com/apache/beam/pull/30409#issuecomment-1961994691 I don't think we should actually change the behavior here. I'd vote we just document the parameter a little better. I left a couple comments in the issue this is responding to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: beam.io.WriteToText deletes existing file even if skip_if_empty=True [beam]
damccorm commented on issue #27926: URL: https://github.com/apache/beam/issues/27926#issuecomment-1961994381 I believe this is also consistent across our Java/Python implementations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: beam.io.WriteToText deletes existing file even if skip_if_empty=True [beam]
damccorm commented on issue #27926: URL: https://github.com/apache/beam/issues/27926#issuecomment-1961993269 I'm not sure if this is actually a bug. `skip_if_empty` is a parameter controlling whether we write files or not (from the pydoc "Don’t write any shards if the PCollection is empty."). In general, this transform assumes that the destination is empty, or it will clear it to be empty. Its not actually clear to me that `skip_if_empty` should impact our deletion behavior though; I think if I'm writing an empty PCollection, I would expect the result in destination to be either no file or an empty file depending on the parameter. If the end contents are the same, it (to me) indicates that an identical PCollection was received and rewritten to the file. Basically, deleting the file is the only way we can be certain that the PCollection was empty. Because of this, I'm hesitant to change the meaning of this parameter (note that this is also mildly breaking). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [yaml] Add Beam YAML Examples and Getting started docs [beam]
robertwb commented on code in PR #30003: URL: https://github.com/apache/beam/pull/30003#discussion_r1501143404 ## sdks/python/apache_beam/yaml/examples/README.md: ## @@ -0,0 +1,48 @@ +# Examples Catalog + + +* [Examples Catalog](#examples-catalog) + * [Wordcount](#wordcount) + * [Transforms](#transforms) +* [Element-wise](#element-wise) +* [Aggregation](#aggregation) + + +This module contains a series of Beam YAML code samples that can be run using +the command: +``` +python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml +``` + +## Wordcount +A good starting place is the [Wordcount](wordcount_minimal.yaml) example under +the root example directory. +This example reads in a text file, splits the text on each word, groups by each +word, and counts the occurrence of each word. This is a classic example used in +the other SDK's and shows off many of the functionalities of Beam YAML. + +## Transforms + +Examples in this directory show off the various built-in transforms of the Beam +YAML framework. + +### Element-wise +These examples leverage the built-in mapping transforms including `MapToFields`, +`Filter` and `Explode`. More information can be found about mapping transforms +[here](../docs/yaml_mapping.md). Review Comment: Now that they're live, let's point to the official docs on https://beam.apache.org/documentation/sdks/yaml/ ## sdks/python/apache_beam/yaml/examples/wordcount_minimal.yaml: ## @@ -0,0 +1,75 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the# Row(output='License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an# Row(output='AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This examples reads from a public file stores on Google Cloud. This +# requires authenticating with Google Cloud, or setting the file in +#`ReadFromText` to a local file. +# +# To set up Application Default Credentials, +# see https://cloud.google.com/docs/authentication/external/set-up-adc for more +# information +# +# This pipeline reads in a text file, maps all words to a value of "1", sums Review Comment: Perhaps intersperse these comments with the code itself? ## sdks/python/apache_beam/yaml/examples/README.md: ## @@ -0,0 +1,48 @@ +# Examples Catalog + + +* [Examples Catalog](#examples-catalog) + * [Wordcount](#wordcount) + * [Transforms](#transforms) +* [Element-wise](#element-wise) +* [Aggregation](#aggregation) + + +This module contains a series of Beam YAML code samples that can be run using +the command: +``` +python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml +``` + +## Wordcount +A good starting place is the [Wordcount](wordcount_minimal.yaml) example under +the root example directory. +This example reads in a text file, splits the text on each word, groups by each +word, and counts the occurrence of each word. This is a classic example used in +the other SDK's and shows off many of the functionalities of Beam YAML. + +## Transforms + +Examples in this directory show off the various built-in transforms of the Beam +YAML framework. + +### Element-wise +These examples leverage the built-in mapping transforms including `MapToFields`, +`Filter` and `Explode`. More information can be found about mapping transforms +[here](../docs/yaml_mapping.md). + +### Aggregation +These examples leverage the built-in `Combine` transform for performing simple +aggregations including sum, mean, count, etc. + +These examples are experimental and require that +`yaml_experimental_features: Combine` be specified under the `options` tag, or +by passing `--yaml_experimental_features=Combine` to the command to run the +pipeline. i.e. +``` +python -m apache_beam.yaml.main \ + --pipeline_spec_file=/path/to/example.yaml \ + --yaml_experimental_features=Combine +``` +More information can be found about aggregation transforms +[here](../docs/yaml_combine.md). Review Comment: https://beam.apache.org/documentation/sdks/yaml-combine/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] Implementing lull reporting at bundle level processing [beam]
arvindram03 commented on PR #29882: URL: https://github.com/apache/beam/pull/29882#issuecomment-1961960410 Seems like an unrelated failure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1501142124 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: Thanks. IIUC, append is still correct, as there's a bag (with possibly multiple items) assigned to every point in the ordered space. (It's a lot like MultiMap with the ability to read ranges in order rather than just do point lookups, though +1 to not mixing the two.) Agree on caching--there are more clever things we can do here in the future, but we can punt that to future work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]
github-actions[bot] commented on PR #30409: URL: https://github.com/apache/beam/pull/30409#issuecomment-1961955890 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @damccorm for label python. R: @johnjcasey for label io. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Ensure flatten windows match [beam]
github-actions[bot] commented on PR #30410: URL: https://github.com/apache/beam/pull/30410#issuecomment-1961955832 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @liferoad for label python. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor commit logic out of StreamingDataflowWorker [beam]
scwhittle commented on code in PR #30312: URL: https://github.com/apache/beam/pull/30312#discussion_r1500963561 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolCloseableStreamFactory.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +import java.util.function.Supplier; +import org.joda.time.Duration; + +/** + * Closeable {@link WindmillStream} factory that uses a {@link WindmillStreamPool} to create and + * release streams. + */ +public final class WindmillStreamPoolCloseableStreamFactory +implements Supplier> { + private static final int NUM_COMMIT_STREAMS = 1; + private static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); + + private final WindmillStreamPool streamPool; + + public WindmillStreamPoolCloseableStreamFactory(Supplier streamFactory) { +this.streamPool = +WindmillStreamPool.create(NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, streamFactory); Review Comment: can we just change StreamPool to have a method returning ClosableStreams? Not sure we need this new class. This class also hard-codes parameters that we might want to change for different stream pools. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
je-ik commented on code in PR #30403: URL: https://github.com/apache/beam/pull/30403#discussion_r1501138097 ## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java: ## @@ -143,6 +143,9 @@ */ public abstract class PTransform implements Serializable /* See the note above */, HasDisplayData { + + private static final long serialVersionUID = 3383862966597863311L; Review Comment: The problem is that Flink needs to serialize the UnboundedSource (after split) into state to support legacy Read. I'm not sure what is the correct path of fixing this - we could use Kryo, but UnboundedSource actually is `Serializable`. How does Dataflow serialize the source? I suppose the same applies for ReadViaSDF as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [YAML] Fix MapToFields error output type inference [beam]
robertwb merged PR #30378: URL: https://github.com/apache/beam/pull/30378 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
robertwb commented on code in PR #30403: URL: https://github.com/apache/beam/pull/30403#discussion_r1501133515 ## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java: ## @@ -143,6 +143,9 @@ */ public abstract class PTransform implements Serializable /* See the note above */, HasDisplayData { + + private static final long serialVersionUID = 3383862966597863311L; Review Comment: Ack. Hopefully we can clean this up in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
je-ik commented on PR #30403: URL: https://github.com/apache/beam/pull/30403#issuecomment-1961911653 Makes sense. I removed the sdk.util version. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Ensure flatten windows match [beam]
damccorm opened a new pull request, #30410: URL: https://github.com/apache/beam/pull/30410 When these windows don't match, it can create unexpected behavior downstream; it doesn't really make sense to flatten different windows. Fixes #22903 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]
riteshghorse opened a new pull request, #30409: URL: https://github.com/apache/beam/pull/30409 Fixes #27926 When skip_if_empty is set to true and no shards are present, don't delete the existing the files. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implementing lull reporting at bundle level processing [beam]
arvindram03 commented on PR #29882: URL: https://github.com/apache/beam/pull/29882#issuecomment-1961885912 The tests are green and ready to be merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implementing lull reporting at bundle level processing [beam]
arvindram03 closed pull request #29882: Implementing lull reporting at bundle level processing URL: https://github.com/apache/beam/pull/29882 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]
tvalentyn merged PR #30408: URL: https://github.com/apache/beam/pull/30408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]
github-actions[bot] commented on PR #30408: URL: https://github.com/apache/beam/pull/30408#issuecomment-1961792250 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]
tvalentyn commented on PR #30408: URL: https://github.com/apache/beam/pull/30408#issuecomment-1961790669 R: @svetakvsundhar -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Implementing lull reporting at bundle level processing [beam]
arvindram03 opened a new pull request, #29882: URL: https://github.com/apache/beam/pull/29882 Implementing lull reporting for dataflow worker at bundle level processing. We dump a stack trace when the bundle processing time exceeds 10 mins. As part of this, we log the step names and time spent in each step to help users debug stuck jobs better. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix invalid escape sequence '\#' [beam]
mitchej123 commented on PR #30393: URL: https://github.com/apache/beam/pull/30393#issuecomment-1961784671 Run preCommitPyCoverage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implementing lull reporting at bundle level processing [beam]
arvindram03 closed pull request #29882: Implementing lull reporting at bundle level processing URL: https://github.com/apache/beam/pull/29882 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: Frames to Beam playground no longer render on Beam website. [beam]
tvalentyn commented on issue #30394: URL: https://github.com/apache/beam/issues/30394#issuecomment-1961763069 Rootcause of behavior change: https://github.com/apache/infrastructure-p6/commit/361cb2462de24d8699c807f42a03f9f9c38c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]
tvalentyn opened a new pull request, #30408: URL: https://github.com/apache/beam/pull/30408 Staged URL: TBD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]
github-actions[bot] commented on PR #30406: URL: https://github.com/apache/beam/pull/30406#issuecomment-1961751506 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]
Abacn commented on PR #30406: URL: https://github.com/apache/beam/pull/30406#issuecomment-1961749902 R: @damondouglas @damccorm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Create PubSubIO Load test [beam]
Abacn merged PR #30286: URL: https://github.com/apache/beam/pull/30286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Create PubSubIO Load test [beam]
Abacn commented on code in PR #30286: URL: https://github.com/apache/beam/pull/30286#discussion_r1500991496 ## it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java: ## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.Timestamp; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** PubSubIO performance tests. */ +public class PubSubIOLT extends IOLoadTestBase { + + private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10; + private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + private static final String MAP_RECORDS_STEP_NAME = "Map records"; + private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub"; + private static final Map TEST_CONFIGS_PRESET; + private static TopicName topicName; + private static String testConfigName; + private static Configuration configuration; + private static SubscriptionName subscription; + private static InfluxDBSettings influxDBSettings; + private static PubsubResourceManager resourceManager; + + @Rule public transient TestPipeline writePipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + + static { +try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "local", + PubSubIOLT.Configuration.fromJsonString( + "{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}", + PubSubIOLT.Configuration.class), // 0.2 MB + "medium", + PubSubIOLT.Configuration.fromJsonString( +
Re: [PR] Update .htaccess [beam]
tvalentyn commented on PR #30407: URL: https://github.com/apache/beam/pull/30407#issuecomment-1961745638 actually removing these frames may be a more appropriate course of action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update .htaccess [beam]
tvalentyn closed pull request #30407: Update .htaccess URL: https://github.com/apache/beam/pull/30407 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update .htaccess [beam]
svetakvsundhar commented on PR #30407: URL: https://github.com/apache/beam/pull/30407#issuecomment-1961733250 Ah, I think its addressing the below; sounds good. https://github.com/apache/beam/assets/26037657/c0052f9f-7b2d-45ff-9f43-3b7168192d56;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update .htaccess [beam]
svetakvsundhar commented on PR #30407: URL: https://github.com/apache/beam/pull/30407#issuecomment-1961729904 Not sure I fully understand, which page is this change for? Also, could you link the staging website? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]
github-actions[bot] commented on PR #30406: URL: https://github.com/apache/beam/pull/30406#issuecomment-1961728571 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @bvolpato for label java. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update .htaccess [beam]
github-actions[bot] commented on PR #30407: URL: https://github.com/apache/beam/pull/30407#issuecomment-1961714536 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update .htaccess [beam]
tvalentyn commented on PR #30407: URL: https://github.com/apache/beam/pull/30407#issuecomment-1961710980 R: @svetakvsundhar -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update .htaccess [beam]
tvalentyn opened a new pull request, #30407: URL: https://github.com/apache/beam/pull/30407 Also allowlist google calendar. follow up for #30394 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add DirectCommitWorkStream for direct path [beam]
scwhittle commented on code in PR #30255: URL: https://github.com/apache/beam/pull/30255#discussion_r1500931469 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java: ## @@ -77,14 +79,19 @@ interface CommitWorkStream extends WindmillStream { * onDone will be called with the status of the commit. */ boolean commitWorkItem( -String computation, -Windmill.WorkItemCommitRequest request, -Consumer onDone); +String computation, WorkItemCommitRequest request, Consumer onDone); /** Flushes any pending work items to the wire. */ void flush(); } + @ThreadSafe + interface AsyncCommitWorkStream extends CommitWorkStream { +void queueCommit(Commit commit); Review Comment: add comments to methods in particular, is this required not to block? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectCommitWorkStream.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.streaming.Commit; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.CompleteCommit; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.AsyncCommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link + * org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream} that + * manages its own commit queue, and asynchronously CommitWork RPCs to Streaming Engine as callers + * queue commits on the internal queue. + * + * Callers should call {@link #queueCommit(Commit)} when work is ready to be committed. + */ +public class GrpcDirectCommitWorkStream extends GrpcCommitWorkStream Review Comment: could we use composition instead of inheritence? it seems like functionality is just added on top of the existing stream and could just be a separate object containing a stream it delegates to. https://en.wikipedia.org/wiki/Composition_over_inheritance#:~:text=Composition%20over%20inheritance%20(or%20composite,from%20a%20base%20or%20parent It seems like https://github.com/apache/beam/pull/30312 is ending up there, with a committer object that is wrapping the stream. Perhaps we should just get that one in first and then integrate that object for direct path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] start to refactor persistence layer to prepare for direct path [beam]
scwhittle commented on code in PR #30265: URL: https://github.com/apache/beam/pull/30265#discussion_r1500946427 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ## @@ -462,42 +446,89 @@ public void run() { LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes); } - private static WindmillServerStub createWindmillServerStub( + public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) { +ConcurrentMap computationMap = new ConcurrentHashMap<>(); +long clientId = clientIdGenerator.nextLong(); +return new StreamingDataflowWorker( +createWindmillServerStub( +options, +clientId, +new WorkHeartbeatResponseProcessor( Review Comment: I don't see where this is setup if the forTesting method is used. Are you sure this isn't the cause of the test failure? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implementing lull reporting at bundle level processing [beam]
scwhittle commented on PR #29882: URL: https://github.com/apache/beam/pull/29882#issuecomment-1961654189 Just made final edit (replace anyOf with allOf instead of separate allOf) instead of round-trip. Will merge once tests pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Use BeamModulePlugin org.json version in extensions/ml [beam]
Abacn opened a new pull request, #30406: URL: https://github.com/apache/beam/pull/30406 Fixes #30404 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update description.md [beam]
lostluck merged PR #30401: URL: https://github.com/apache/beam/pull/30401 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [I] [Task]: Use GCP-BOM to manage google cloud dependencies in sdks/java/extensions/ml [beam]
Abacn commented on issue #30405: URL: https://github.com/apache/beam/issues/30405#issuecomment-1961617421 There is at least a known breaking change from google-cloud-recommendations-ai needs to be resolved: https://github.com/googleapis/google-cloud-java/commit/7c866001585563cd673b0bb33a567fd549cc483d changed return type of predictionServiceClient.predict.iterateAll from PredictionResult to Entry -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to avoid the decoding overhead in the fake client
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500878413 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: Sounds good! Thanks a lot for the input, Kenn. I will go ahead with the changes to re-use Get/Append/Clear for ordered list then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after the this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here after the this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design to summarize the decision we make here after the this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
kennknowles commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500851162 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: OK my responses are evolving now that I've read the whole code change and re-read the doc. - I see specialized requests are broken out anyhow, so that's fine. - Including everything needed for caching in the state key is good for raw request caching, so re-using Get is good. Though there are perhaps smarter ways to cache that won't benefit from this - "append" is still a fine method for adding things to ordered list state, but it isn't important and the name is misleading (as it is for bags and multimaps, since they are not ordered, so anyhow it is the same here and might as well keep the incorrect naming) - Obviously `clear` is fine ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: It would be helpful to outline the pro/con in the design doc of little decisions like, and note which one was chosen and why. For example one benefit to splitting the requests is to avoid ordering issues. We would have to spec that either the inserts or deletes happen first, even though they are in one request together. It is a bit confusing. And then if you want them in the other order, you still have to make two requests but each one has an empty field. And note whether there is an efficiency consideration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
kennknowles commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500844153 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: The one counterpoint is that a state key is used for caching, as long as the spec is that a state key is deterministic in what it returns. So it has value for `get`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
kennknowles commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500839373 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: I still agree with my comment there: "it looks like a 'do everything' request/response protocol and the key holds all the information." That comment may seem to be about style, but it is really about transparency, debuggability, readability. I view whatever "consistency" we have amongst the state requests as coincidence at best and a mistake at worst. _The_ reason that different kinds of state exist is because they support different methods. There is a reason that the only method they have in common in Java (where it originated) is `clear()`. Apparently the design decision was to model `methodA, methodB, methodC` as `oneBigMethod("actually do A"), oneBigMethod("actually do B"), oneBigMethod("actually do C")`. I just think that is not as good as being straightforward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Duet AI Prompts: Apache Beam Runners [beam]
damccorm merged PR #30346: URL: https://github.com/apache/beam/pull/30346 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow frame references to Beam Playground. [beam]
svetakvsundhar commented on PR #30391: URL: https://github.com/apache/beam/pull/30391#issuecomment-1961295491 looks like it didnt fix the issue: https://beam.apache.org/documentation/transforms/java/elementwise/pardo/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Create PubSubIO Load test [beam]
akashorabek commented on code in PR #30286: URL: https://github.com/apache/beam/pull/30286#discussion_r1500616246 ## it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java: ## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.Timestamp; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** PubSubIO performance tests. */ +public class PubSubIOLT extends IOLoadTestBase { + + private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10; + private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + private static final String MAP_RECORDS_STEP_NAME = "Map records"; + private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub"; + private static final Map TEST_CONFIGS_PRESET; + private static TopicName topicName; + private static String testConfigName; + private static Configuration configuration; + private static SubscriptionName subscription; + private static InfluxDBSettings influxDBSettings; + private static PubsubResourceManager resourceManager; + + @Rule public transient TestPipeline writePipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + + static { +try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "local", + PubSubIOLT.Configuration.fromJsonString( + "{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}", + PubSubIOLT.Configuration.class), // 0.2 MB + "medium", + PubSubIOLT.Configuration.fromJsonString( +
Re: [PR] Create PubSubIO Load test [beam]
akashorabek commented on code in PR #30286: URL: https://github.com/apache/beam/pull/30286#discussion_r1500616246 ## it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java: ## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.Timestamp; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** PubSubIO performance tests. */ +public class PubSubIOLT extends IOLoadTestBase { + + private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10; + private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + private static final String MAP_RECORDS_STEP_NAME = "Map records"; + private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub"; + private static final Map TEST_CONFIGS_PRESET; + private static TopicName topicName; + private static String testConfigName; + private static Configuration configuration; + private static SubscriptionName subscription; + private static InfluxDBSettings influxDBSettings; + private static PubsubResourceManager resourceManager; + + @Rule public transient TestPipeline writePipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + + static { +try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "local", + PubSubIOLT.Configuration.fromJsonString( + "{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}", + PubSubIOLT.Configuration.class), // 0.2 MB + "medium", + PubSubIOLT.Configuration.fromJsonString( +
Re: [PR] [flink] #30402 restore upgradability [beam]
je-ik commented on PR #30403: URL: https://github.com/apache/beam/pull/30403#issuecomment-1961089654 I was able to upgrade from Beam 2.54.0 to 2.55.0-SNAPSHOT with this patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
je-ik commented on code in PR #30403: URL: https://github.com/apache/beam/pull/30403#discussion_r1500481060 ## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java: ## @@ -143,6 +143,9 @@ */ public abstract class PTransform implements Serializable /* See the note above */, HasDisplayData { + + private static final long serialVersionUID = 3383862966597863311L; Review Comment: Unfortunately Flink uses Java serialization to serialize checkpoints (via SerializableCoder) in legacy Read transform (needed for KafkaIO), so without stabilizing this we are unable to upgrade even with the SerializablePipelineOptions moved back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
github-actions[bot] commented on PR #30403: URL: https://github.com/apache/beam/pull/30403#issuecomment-1961088490 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #30402 restore upgradability [beam]
je-ik commented on PR #30403: URL: https://github.com/apache/beam/pull/30403#issuecomment-1961085713 R: @JozoVilcek @robertwb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] 30402 flink upgrade [beam]
je-ik opened a new pull request, #30403: URL: https://github.com/apache/beam/pull/30403 Closes #30402 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Duet AI Prompts: Apache Beam Runners [beam]
dariabezkorovaina commented on PR #30346: URL: https://github.com/apache/beam/pull/30346#issuecomment-1961077682 @damccorm thanks very much:) Implemented the changes, please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update description.md [beam]
github-actions[bot] commented on PR #30401: URL: https://github.com/apache/beam/pull/30401#issuecomment-1960998086 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @lostluck added as fallback since no labels match configuration Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update description.md [beam]
rtarway opened a new pull request, #30401: URL: https://github.com/apache/beam/pull/30401 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org