Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-15 Thread via GitHub


Abacn merged PR #29395:
URL: https://github.com/apache/beam/pull/29395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-15 Thread via GitHub


tomstepp commented on PR #29395:
URL: https://github.com/apache/beam/pull/29395#issuecomment-1812973473

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-14 Thread via GitHub


Abacn commented on code in PR #29395:
URL: https://github.com/apache/beam/pull/29395#discussion_r1393376086


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##
@@ -59,6 +60,10 @@ public String getUrn() {
 @Override
 public RunnerApi.FunctionSpec translate(
 AppliedPTransform> transform, SdkComponents 
components) {
+  if (ExperimentalOptions.hasExperiment(

Review Comment:
   I see, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-14 Thread via GitHub


tomstepp commented on code in PR #29395:
URL: https://github.com/apache/beam/pull/29395#discussion_r1393372057


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##
@@ -59,6 +60,10 @@ public String getUrn() {
 @Override
 public RunnerApi.FunctionSpec translate(
 AppliedPTransform> transform, SdkComponents 
components) {
+  if (ExperimentalOptions.hasExperiment(

Review Comment:
   I don't think the check is needed since it's replacing the _unbounded_ 
version, which should already only be used for streaming jobs. But please lmk 
if there's any concerns with this, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-14 Thread via GitHub


Abacn commented on code in PR #29395:
URL: https://github.com/apache/beam/pull/29395#discussion_r1393078349


##
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java:
##
@@ -35,6 +35,10 @@ public interface ExperimentalOptions extends PipelineOptions 
{
 
   String STATE_SAMPLING_PERIOD_MILLIS = "state_sampling_period_millis";
 
+  String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source";

Review Comment:
   Thanks, however, I meant because now we used this experiment in both places, 
it is better to define the constant in PubsubIO. google-cloud-dataflow-java 
depends on sdks-io-google-cloud-platform so it is fine refer to PubsubIO's 
constant inside DataflowRunner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-14 Thread via GitHub


Abacn commented on code in PR #29395:
URL: https://github.com/apache/beam/pull/29395#discussion_r1393076860


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##
@@ -59,6 +60,10 @@ public String getUrn() {
 @Override
 public RunnerApi.FunctionSpec translate(
 AppliedPTransform> transform, SdkComponents 
components) {
+  if (ExperimentalOptions.hasExperiment(

Review Comment:
   I see V1 overwrite is only effective in streaming mode
   
   
https://github.com/apache/beam/blob/90cbdd0648e34127fd27809b44e909f23fb37aa1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L551
   
   should we only make the new branch also streaming only or not necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add custom pubsub source and sink experiment support for runner v2. [beam]

2023-11-14 Thread via GitHub


Abacn commented on code in PR #29395:
URL: https://github.com/apache/beam/pull/29395#discussion_r1393070692


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##
@@ -59,6 +60,10 @@ public String getUrn() {
 @Override
 public RunnerApi.FunctionSpec translate(
 AppliedPTransform> transform, SdkComponents 
components) {
+  if (ExperimentalOptions.hasExperiment(

Review Comment:
   Thanks for explanation, I see. Both V1 and V2 pipeline protos are 
constructed in DataflowRunner.run()
   
   - V2 pipeline proto is `portablePipelineProto`, used to write to 
stagedPipeline
   - V1 pipeline proto is `dataflowV1PipelineProto`, used to construct 
jobSpecification and essentially initiate the job (newJob)
   
   This is unusual and quite easily get confused at first glance... but 
anywhere here we are. I understand now why the DataflowRunner settings won't 
affect runner v2 job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]