Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391776862 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Looks like replaceV1Transforms is only used for v1. The portable job submission proto is created ahead of this at line 1170 for v2 jobs. Is it a suitable+better fix to run replaceV1Transforms before creating portablePipelineProto? Or if replaceV1Transforms has some things that are meant to be only for v1, then perhaps we can split out the custom pubsub overrides to occur before the portable job submission proto is created, and leave the remaining overrides to be done only for v1 pipeline. ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Looks like replaceV1Transforms is only used for v1. The portable job submission proto is created ahead of this at line 1170 for v2 jobs. Is it a suitable+better fix to run replaceV1Transforms before creating portablePipelineProto? Or if replaceV1Transforms has some things that are meant to be only for v1, then perhaps we can split out the custom pubsub overrides to occur before the portable job submission proto is created, and leave the remaining overrides to be done only for v1 pipelines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391776862 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Looks like replaceV1Transforms is only used for v1. The portable job submission proto is created ahead of this at line 1170 for v2 jobs. Is it a suitable+better fix to run replaceV1Transforms before creating portablePipelineProto? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391776862 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Looks like replaceV1Transforms is only used for v1. The portable job submission proto is created ahead of this for v2 jobs. Is it a suitable+better fix to run replaceV1Transforms before creating portablePipelineProto? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391776862 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Looks like replaceV1Transforms is only used for v1. The portable job submission proto is created ahead of this. Is it a suitable+better fix to run replaceV1Transforms before creating portablePipelineProto? ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Since runner v2 uses portable job submission I think that DataflowRunner path is not used, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391754326 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Or at least not the replaceV1Transforms path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391772243 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Oh I actually looked at the code and seems that replaceV1Transforms may be intended to run for both versions. In that case I'm not sure yet if it actually runs or if the replacement fails on v2 for another reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391776862 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Looks like replaceV1Transforms is only used for v1. The portable job submission proto is done ahead of this. Is it a suitable+better fix to run replaceV1Transforms before creating portablePipelineProto? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391772243 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Oh I actually looked at the code and seems that replaceV1Transforms may be intended to run for both versions. In that case I'm not sure yet if it actually runs or if the replacement fails on v2 for another reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391754326 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Or at least not the replaceV1Transforms path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391753703 ## sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java: ## @@ -35,6 +35,10 @@ public interface ExperimentalOptions extends PipelineOptions { String STATE_SAMPLING_PERIOD_MILLIS = "state_sampling_period_millis"; + String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source"; Review Comment: I removed the constants, just using literals now. ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Since runner v2 uses portable job submission I think that DataflowRunner path is not used, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
Abacn commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391653051 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Would you mind explain a little bit why the current transform override (referred above, override invoked when the experiments not provided) is not sufficient runner v2? From DataflowRunner.java the stacktrack for execution to this point is DataflowRunner.getOverrides() L552 DataflowRunner.replaceV1Transforms() L1579 DataflowRunner.run() L1193 which means both Dataflow v1 and v2 job submission had the override by default, while non-Dataflow runner always has custom PubsubIO (aka Beam provided Pubsub IO) So there are two questions here - why the current override working for Dataflow v1 [override can be enabled (default) or disabled (with flag)] but not v2 (persumably always enabled?)? - is it possible to keep the change inside Dataflow runner, follow the existing pattern there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
Abacn commented on code in PR #29395: URL: https://github.com/apache/beam/pull/29395#discussion_r1391637530 ## sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java: ## @@ -35,6 +35,10 @@ public interface ExperimentalOptions extends PipelineOptions { String STATE_SAMPLING_PERIOD_MILLIS = "state_sampling_period_millis"; + String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source"; Review Comment: sdk core code path does not hold information about gcp io component (pubsub experiment). Consider put these literals inside PubsubIO? ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java: ## @@ -59,6 +60,10 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (ExperimentalOptions.hasExperiment( Review Comment: Would you mind explain a little bit why the current transform override (referred above, override invoked when the experiments not provided) is not sufficient runner v2? From DataflowRunner.java the stacktrack for execution to this point is DataflowRunner.getOverrides() L552 DataflowRunner.replaceV1Transforms() L1579 DataflowRunner.run() L1193 which means both Dataflow v1 and v2 job submission had the override by default, while non-Dataflow runner always has custom runner (aka Beam provided Pubsub IO) So there are two questions here - why the current override working for Dataflow v1 [override can be enabled (default) or disabled (with flag)] but not v2 (persumably always enabled?)? - is it possible to keep the change inside Dataflow runner, follow the existing pattern there. ## sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java: ## @@ -35,6 +35,10 @@ public interface ExperimentalOptions extends PipelineOptions { String STATE_SAMPLING_PERIOD_MILLIS = "state_sampling_period_millis"; + String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source"; Review Comment: Also, these need to be in sync with https://github.com/apache/beam/blob/618d7a8f2c6520bfae1ca84dced7fd39d7af45f4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L552 and https://github.com/apache/beam/blob/618d7a8f2c6520bfae1ca84dced7fd39d7af45f4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L558. If a constant is created, also refer to them there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
Abacn commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1808586976 Run Java_IOs_Direct PreCommit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
Abacn commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1808587300 Run Java PreCommit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
Abacn commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1808587150 Java_GCP_IO_Direct PreCommit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
github-actions[bot] commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1808469439 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @Abacn for label java. R: @bvolpato for label io. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1808467270 assign set of reviewers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
github-actions[bot] commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1806623647 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp commented on PR #29395: URL: https://github.com/apache/beam/pull/29395#issuecomment-1806575679 fixes #29397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add custom pubsub source/sink experiment support for runner v2. [beam]
tomstepp opened a new pull request, #29395: URL: https://github.com/apache/beam/pull/29395 **Please** add a meaningful description for your change here: todo Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org