Re: [PR] start to refactor persistence layer to prepare for direct path [beam]

2024-02-23 Thread via GitHub


m-trieu commented on code in PR #30265:
URL: https://github.com/apache/beam/pull/30265#discussion_r1501363890


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -462,42 +446,89 @@ public void run() {
 LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
   }
 
-  private static WindmillServerStub createWindmillServerStub(
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
+long clientId = clientIdGenerator.nextLong();
+return new StreamingDataflowWorker(
+createWindmillServerStub(
+options,
+clientId,
+new WorkHeartbeatResponseProcessor(

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Wire error handling into PubSubIO and add initial tests [beam]

2024-02-23 Thread via GitHub


Naireen commented on code in PR #30372:
URL: https://github.com/apache/beam/pull/30372#discussion_r1501342511


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java:
##
@@ -125,18 +140,42 @@ public void process(
   @Timestamp Instant ts,
   BoundedWindow window,
   PaneInfo paneInfo,
-  OutputReceiver o) {
+  MultiOutputReceiver o)
+  throws Exception {
 ValueInSingleWindow valueInSingleWindow =
 ValueInSingleWindow.of(element, ts, window, paneInfo);
-PubsubMessage message = formatFunction.apply(valueInSingleWindow);
+PubsubMessage message;
+try {
+  message = formatFunction.apply(valueInSingleWindow);
+} catch (Exception e) {
+  badRecordRouter.route(
+  o,
+  element,
+  inputCoder,
+  e,
+  "Failed to serialize PubSub message with provided format function");
+  return;
+}
 if (topicFunction != null) {
-  message = 
message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
+  try {
+message = 
message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
+  } catch (Exception e) {
+badRecordRouter.route(
+o, element, inputCoder, e, "Failed to determine PubSub topic using 
topic function");

Review Comment:
   Should we just add valueInSingleWindow into the error message here so its 
easier to see what went wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Wire error handling into PubSubIO and add initial tests [beam]

2024-02-23 Thread via GitHub


Naireen commented on code in PR #30372:
URL: https://github.com/apache/beam/pull/30372#discussion_r1501341983


##
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##
@@ -1089,57 +1130,73 @@ public PCollection expand(PBegin input) {
   getNeedsMessageId(),
   getNeedsOrderingKey());
 
-  PCollection read;
   PCollection preParse = input.apply(source);
   TypeDescriptor typeDescriptor = new TypeDescriptor() {};
-  if (getDeadLetterTopicProvider() == null) {
+  PCollection read;
+  if (getDeadLetterTopicProvider() == null
+  && (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
 read = 
preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
   } else {
+// parse PubSub messages, separating out execptions

Review Comment:
   nit: exceptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: Pytest fails on a pipeline when importing from apache_beam.io.gcp.bigquery [beam]

2024-02-23 Thread via GitHub


github-actions[bot] closed issue #30392: [Bug]: Pytest fails on a pipeline when 
importing from apache_beam.io.gcp.bigquery
URL: https://github.com/apache/beam/issues/30392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: Pytest fails on a pipeline when importing from apache_beam.io.gcp.bigquery [beam]

2024-02-23 Thread via GitHub


mitchej123 commented on issue #30392:
URL: https://github.com/apache/beam/issues/30392#issuecomment-1962227841

   .close-issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix invalid escape sequence '\#' [beam]

2024-02-23 Thread via GitHub


tvalentyn merged PR #30393:
URL: https://github.com/apache/beam/pull/30393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix invalid escape sequence '\#' [beam]

2024-02-23 Thread via GitHub


tvalentyn closed pull request #30393: Fix invalid escape sequence '\#'
URL: https://github.com/apache/beam/pull/30393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] start to refactor persistence layer to prepare for direct path [beam]

2024-02-23 Thread via GitHub


m-trieu commented on code in PR #30265:
URL: https://github.com/apache/beam/pull/30265#discussion_r1501305913


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -462,42 +446,89 @@ public void run() {
 LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
   }
 
-  private static WindmillServerStub createWindmillServerStub(
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
+long clientId = clientIdGenerator.nextLong();
+return new StreamingDataflowWorker(
+createWindmillServerStub(
+options,
+clientId,
+new WorkHeartbeatResponseProcessor(

Review Comment:
   figured it out looks like the work isn't being populated in the computation 
map will fix it
   
   thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] start to refactor persistence layer to prepare for direct path [beam]

2024-02-23 Thread via GitHub


m-trieu commented on code in PR #30265:
URL: https://github.com/apache/beam/pull/30265#discussion_r1501297099


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -462,42 +446,89 @@ public void run() {
 LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
   }
 
-  private static WindmillServerStub createWindmillServerStub(
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
+long clientId = clientIdGenerator.nextLong();
+return new StreamingDataflowWorker(
+createWindmillServerStub(
+options,
+clientId,
+new WorkHeartbeatResponseProcessor(

Review Comment:
   it gets set up here in FakeWindmillServer
   
![image](https://github.com/apache/beam/assets/29557027/6701f28a-37d6-4a95-aa3d-6632e0d05fb9)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] start to refactor persistence layer to prepare for direct path [beam]

2024-02-23 Thread via GitHub


m-trieu commented on code in PR #30265:
URL: https://github.com/apache/beam/pull/30265#discussion_r1501295248


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -462,42 +446,89 @@ public void run() {
 LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
   }
 
-  private static WindmillServerStub createWindmillServerStub(
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
+long clientId = clientIdGenerator.nextLong();
+return new StreamingDataflowWorker(
+createWindmillServerStub(
+options,
+clientId,
+new WorkHeartbeatResponseProcessor(

Review Comment:
   it gets set up here in FakeWindmillServer
   ![Uploading image.png…]()
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Python] Vertex AI Feature Store enrichment handler [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30388:
URL: https://github.com/apache/beam/pull/30388#issuecomment-1962165882

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix invalid escape sequence '\#' [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30393:
URL: https://github.com/apache/beam/pull/30393#issuecomment-1962089721

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @tvalentyn for label python.
   R: @ahmedabu98 for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


robertwb commented on code in PR #30403:
URL: https://github.com/apache/beam/pull/30403#discussion_r1501236860


##
runners/core-java/build.gradle:
##
@@ -46,6 +46,8 @@ dependencies {
   implementation library.java.joda_time
   implementation library.java.vendored_grpc_1_60_1
   implementation library.java.slf4j_api
+  implementation library.java.jackson_core
+  implementation library.java.jackson_databind

Review Comment:
   Ugh...
   
   @kennknowles would be in a better position to answer this. 
   
   We could re-introduce runners-core-construction with this as the sole class 
if needed if we can't place it in runners-core itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: Beam uses a version of the org.json:json that has a Category X license [beam]

2024-02-23 Thread via GitHub


Abacn closed issue #30404: [Bug]: Beam uses a version of the org.json:json that 
has a Category X license 
URL: https://github.com/apache/beam/issues/30404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Performance Regression or Improvement: pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_latency_micro_secs:mean_inference_batch_latency_micro_secs [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on issue #27986:
URL: https://github.com/apache/beam/issues/27986#issuecomment-1962071521

   
 Performance change found in the
 test: 
`pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_latency_micro_secs`
 for the metric: `mean_inference_batch_latency_micro_secs`.
   
 For more information on how to triage the alerts, please look at
 `Triage performance alert issues` section of the 
[README](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/testing/analyzers/README.md#triage-performance-alert-issues).
   
   
   `Test description:` Pytorch image classification on 50k images of size 224 x 
224 with resnet 152 with Tesla T4 GPU.
   Test link - 
https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L151).
   Test dashboard - 
http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?from=now-90d=now=2
   
   
   ```
   
   timestamp: Fri Feb 23 06:53:41 2024, metric_value: 4560157.85
   timestamp: Wed Feb 21 07:12:25 2024, metric_value: 4883101.24
   timestamp: Tue Feb 20 06:53:46 2024, metric_value: 5398728.80 < Anomaly
   timestamp: Sun Feb 18 06:41:31 2024, metric_value: 3060535.64
   timestamp: Sat Feb 17 06:52:20 2024, metric_value: 2565324.57
   timestamp: Fri Feb 16 06:55:15 2024, metric_value: 3789397.17
   timestamp: Thu Feb 15 06:58:53 2024, metric_value: 4431068.78
   timestamp: Wed Feb 14 06:50:20 2024, metric_value: 3481961.73
   timestamp: Mon Feb 12 06:42:22 2024, metric_value: 3489616.96
   timestamp: Sat Feb 10 06:45:00 2024, metric_value: 3158261.25
   timestamp: Fri Feb  9 06:47:02 2024, metric_value: 2655775.48
   timestamp: Thu Feb  8 06:47:05 2024, metric_value: 2885917.85
   timestamp: Wed Feb  7 06:52:04 2024, metric_value: 3010852.66
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]

2024-02-23 Thread via GitHub


Abacn merged PR #30406:
URL: https://github.com/apache/beam/pull/30406


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


je-ik commented on code in PR #30403:
URL: https://github.com/apache/beam/pull/30403#discussion_r1501221936


##
runners/core-java/build.gradle:
##
@@ -46,6 +46,8 @@ dependencies {
   implementation library.java.joda_time
   implementation library.java.vendored_grpc_1_60_1
   implementation library.java.slf4j_api
+  implementation library.java.jackson_core
+  implementation library.java.jackson_databind

Review Comment:
   @robertwb this is unfortunate. Do we have a check for API surface that this 
does not leak anywhere? In old construction-java and in sdk-core this is 
shaded. I can shade it, but runners-core is currently "plain", so that could be 
argument against placing it into the runners-core. I'm not familar enough wth 
the details of the build system to make a decision myself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [yaml] Add Beam YAML Examples and Getting started docs [beam]

2024-02-23 Thread via GitHub


Polber commented on code in PR #30003:
URL: https://github.com/apache/beam/pull/30003#discussion_r1501217877


##
sdks/python/apache_beam/yaml/examples/wordcount_minimal.yaml:
##
@@ -0,0 +1,75 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the#  Row(output='License'); you may not use this file except in compliance 
with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an#  Row(output='AS IS' 
BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This examples reads from a public file stores on Google Cloud. This
+# requires authenticating with Google Cloud, or setting the file in
+#`ReadFromText` to a local file.
+#
+# To set up Application Default Credentials,
+# see https://cloud.google.com/docs/authentication/external/set-up-adc for more
+# information
+#
+# This pipeline reads in a text file, maps all words to a value of "1", sums

Review Comment:
   @robertwb I refactored the example a bit to make it follow the logic more 
semantically. It also outputs `Row(word=..., count=...)` instead of 
`Row(output="word: count")`
   
   Let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update beam-2.54.0.md about v2 default. [beam]

2024-02-23 Thread via GitHub


robertwb merged PR #30411:
URL: https://github.com/apache/beam/pull/30411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update beam-2.54.0.md about v2 default. [beam]

2024-02-23 Thread via GitHub


robertwb opened a new pull request, #30411:
URL: https://github.com/apache/beam/pull/30411

   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Ensure flatten windows match [beam]

2024-02-23 Thread via GitHub


damccorm commented on code in PR #30410:
URL: https://github.com/apache/beam/pull/30410#discussion_r1501185496


##
sdks/python/apache_beam/transforms/core.py:
##
@@ -3683,8 +3683,14 @@ def _extract_input_pvalues(self, pvalueish):
 return pvalueish, pvalueish
 
   def expand(self, pcolls):
+windowing = self.get_windowing(pcolls)
 for pcoll in pcolls:
   self._check_pcollection(pcoll)
+  if pcoll.windowing != windowing:

Review Comment:
   Yeah, I think that's a good idea. It turns out our test asserts actually do 
this already for some weird reasons, so I'm sure others do as well. That's what 
is causing CI to fail - 
https://github.com/apache/beam/blob/20675c860f46f3f4abce061a6b490166ca68df0f/sdks/python/apache_beam/testing/util.py#L286C7-L286C22



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]

2024-02-23 Thread via GitHub


riteshghorse commented on PR #30409:
URL: https://github.com/apache/beam/pull/30409#issuecomment-1962011172

   That makes sense. I think it will confuse users if the file exists even 
though PCollection was empty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: beam.io.WriteToText deletes existing file even if skip_if_empty=True [beam]

2024-02-23 Thread via GitHub


riteshghorse commented on issue #27926:
URL: https://github.com/apache/beam/issues/27926#issuecomment-1962010219

   Oh yes, that makes sense. I didn't think of that. This could confuse users. 
Added a doc comment instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [yaml] Add Beam YAML Examples and Getting started docs [beam]

2024-02-23 Thread via GitHub


Polber commented on code in PR #30003:
URL: https://github.com/apache/beam/pull/30003#discussion_r1501174577


##
sdks/python/apache_beam/yaml/examples/README.md:
##
@@ -0,0 +1,48 @@
+# Examples Catalog
+
+
+* [Examples Catalog](#examples-catalog)
+  * [Wordcount](#wordcount)
+  * [Transforms](#transforms)
+* [Element-wise](#element-wise)
+* [Aggregation](#aggregation)
+
+
+This module contains a series of Beam YAML code samples that can be run using 
+the command:
+```
+python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
+```
+
+## Wordcount
+A good starting place is the [Wordcount](wordcount_minimal.yaml) example under 
+the root example directory.
+This example reads in a text file, splits the text on each word, groups by 
each 
+word, and counts the occurrence of each word. This is a classic example used in
+the other SDK's and shows off many of the functionalities of Beam YAML.
+
+## Transforms
+
+Examples in this directory show off the various built-in transforms of the 
Beam 
+YAML framework.
+
+### Element-wise
+These examples leverage the built-in mapping transforms including 
`MapToFields`,
+`Filter` and `Explode`. More information can be found about mapping transforms
+[here](../docs/yaml_mapping.md).

Review Comment:
   I pointed to UDF section since that is where MapToFields lives



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Ensure flatten windows match [beam]

2024-02-23 Thread via GitHub


liferoad commented on code in PR #30410:
URL: https://github.com/apache/beam/pull/30410#discussion_r1501172737


##
sdks/python/apache_beam/transforms/core.py:
##
@@ -3683,8 +3683,14 @@ def _extract_input_pvalues(self, pvalueish):
 return pvalueish, pvalueish
 
   def expand(self, pcolls):
+windowing = self.get_windowing(pcolls)
 for pcoll in pcolls:
   self._check_pcollection(pcoll)
+  if pcoll.windowing != windowing:

Review Comment:
   Shall we just log the error since raising the error could break some users' 
existing jobs? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]

2024-02-23 Thread via GitHub


damccorm commented on PR #30409:
URL: https://github.com/apache/beam/pull/30409#issuecomment-1961994691

   I don't think we should actually change the behavior here. I'd vote we just 
document the parameter a little better. I left a couple comments in the issue 
this is responding to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: beam.io.WriteToText deletes existing file even if skip_if_empty=True [beam]

2024-02-23 Thread via GitHub


damccorm commented on issue #27926:
URL: https://github.com/apache/beam/issues/27926#issuecomment-1961994381

   I believe this is also consistent across our Java/Python implementations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: beam.io.WriteToText deletes existing file even if skip_if_empty=True [beam]

2024-02-23 Thread via GitHub


damccorm commented on issue #27926:
URL: https://github.com/apache/beam/issues/27926#issuecomment-1961993269

   I'm not sure if this is actually a bug. `skip_if_empty` is a parameter 
controlling whether we write files or not (from the pydoc "Don’t write any 
shards if the PCollection is empty."). In general, this transform assumes that 
the destination is empty, or it will clear it to be empty.
   
   Its not actually clear to me that `skip_if_empty` should impact our deletion 
behavior though; I think if I'm writing an empty PCollection, I would expect 
the result in destination to be either no file or an empty file depending on 
the parameter. If the end contents are the same, it (to me) indicates that an 
identical PCollection was received and rewritten to the file. Basically, 
deleting the file is the only way we can be certain that the PCollection was 
empty.
   
   Because of this, I'm hesitant to change the meaning of this parameter (note 
that this is also mildly breaking).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [yaml] Add Beam YAML Examples and Getting started docs [beam]

2024-02-23 Thread via GitHub


robertwb commented on code in PR #30003:
URL: https://github.com/apache/beam/pull/30003#discussion_r1501143404


##
sdks/python/apache_beam/yaml/examples/README.md:
##
@@ -0,0 +1,48 @@
+# Examples Catalog
+
+
+* [Examples Catalog](#examples-catalog)
+  * [Wordcount](#wordcount)
+  * [Transforms](#transforms)
+* [Element-wise](#element-wise)
+* [Aggregation](#aggregation)
+
+
+This module contains a series of Beam YAML code samples that can be run using 
+the command:
+```
+python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
+```
+
+## Wordcount
+A good starting place is the [Wordcount](wordcount_minimal.yaml) example under 
+the root example directory.
+This example reads in a text file, splits the text on each word, groups by 
each 
+word, and counts the occurrence of each word. This is a classic example used in
+the other SDK's and shows off many of the functionalities of Beam YAML.
+
+## Transforms
+
+Examples in this directory show off the various built-in transforms of the 
Beam 
+YAML framework.
+
+### Element-wise
+These examples leverage the built-in mapping transforms including 
`MapToFields`,
+`Filter` and `Explode`. More information can be found about mapping transforms
+[here](../docs/yaml_mapping.md).

Review Comment:
   Now that they're live, let's point to the official docs on 
https://beam.apache.org/documentation/sdks/yaml/



##
sdks/python/apache_beam/yaml/examples/wordcount_minimal.yaml:
##
@@ -0,0 +1,75 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the#  Row(output='License'); you may not use this file except in compliance 
with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an#  Row(output='AS IS' 
BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This examples reads from a public file stores on Google Cloud. This
+# requires authenticating with Google Cloud, or setting the file in
+#`ReadFromText` to a local file.
+#
+# To set up Application Default Credentials,
+# see https://cloud.google.com/docs/authentication/external/set-up-adc for more
+# information
+#
+# This pipeline reads in a text file, maps all words to a value of "1", sums

Review Comment:
   Perhaps intersperse these comments with the code itself? 



##
sdks/python/apache_beam/yaml/examples/README.md:
##
@@ -0,0 +1,48 @@
+# Examples Catalog
+
+
+* [Examples Catalog](#examples-catalog)
+  * [Wordcount](#wordcount)
+  * [Transforms](#transforms)
+* [Element-wise](#element-wise)
+* [Aggregation](#aggregation)
+
+
+This module contains a series of Beam YAML code samples that can be run using 
+the command:
+```
+python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
+```
+
+## Wordcount
+A good starting place is the [Wordcount](wordcount_minimal.yaml) example under 
+the root example directory.
+This example reads in a text file, splits the text on each word, groups by 
each 
+word, and counts the occurrence of each word. This is a classic example used in
+the other SDK's and shows off many of the functionalities of Beam YAML.
+
+## Transforms
+
+Examples in this directory show off the various built-in transforms of the 
Beam 
+YAML framework.
+
+### Element-wise
+These examples leverage the built-in mapping transforms including 
`MapToFields`,
+`Filter` and `Explode`. More information can be found about mapping transforms
+[here](../docs/yaml_mapping.md).
+
+### Aggregation
+These examples leverage the built-in `Combine` transform for performing simple 
+aggregations including sum, mean, count, etc.
+
+These examples are experimental and require that 
+`yaml_experimental_features: Combine` be specified under the `options` tag, or
+by passing `--yaml_experimental_features=Combine` to the command to run the 
+pipeline. i.e.
+```
+python -m apache_beam.yaml.main \
+  --pipeline_spec_file=/path/to/example.yaml \
+  --yaml_experimental_features=Combine
+```
+More information can be found about aggregation transforms
+[here](../docs/yaml_combine.md).

Review Comment:
   https://beam.apache.org/documentation/sdks/yaml-combine/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] Implementing lull reporting at bundle level processing [beam]

2024-02-23 Thread via GitHub


arvindram03 commented on PR #29882:
URL: https://github.com/apache/beam/pull/29882#issuecomment-1961960410

   Seems like an unrelated failure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1501142124


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   Thanks. 
   
   IIUC, append is still correct, as there's a bag (with possibly multiple 
items) assigned to every point in the ordered space. (It's a lot like MultiMap 
with the ability to read ranges in order rather than just do point lookups, 
though +1 to not mixing the two.) 
   
   Agree on caching--there are more clever things we can do here in the future, 
but we can punt that to future work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30409:
URL: https://github.com/apache/beam/pull/30409#issuecomment-1961955890

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @damccorm for label python.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Ensure flatten windows match [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30410:
URL: https://github.com/apache/beam/pull/30410#issuecomment-1961955832

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @liferoad for label python.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor commit logic out of StreamingDataflowWorker [beam]

2024-02-23 Thread via GitHub


scwhittle commented on code in PR #30312:
URL: https://github.com/apache/beam/pull/30312#discussion_r1500963561


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolCloseableStreamFactory.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import org.joda.time.Duration;
+
+/**
+ * Closeable {@link WindmillStream} factory that uses a {@link 
WindmillStreamPool} to create and
+ * release streams.
+ */
+public final class WindmillStreamPoolCloseableStreamFactory
+implements Supplier> {
+  private static final int NUM_COMMIT_STREAMS = 1;
+  private static final Duration COMMIT_STREAM_TIMEOUT = 
Duration.standardMinutes(1);
+
+  private final WindmillStreamPool streamPool;
+
+  public WindmillStreamPoolCloseableStreamFactory(Supplier 
streamFactory) {
+this.streamPool =
+WindmillStreamPool.create(NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, 
streamFactory);

Review Comment:
   can we just change StreamPool to have a method returning ClosableStreams? 
Not sure we need this new class.  This class also hard-codes parameters that we 
might want to change for different stream pools.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


je-ik commented on code in PR #30403:
URL: https://github.com/apache/beam/pull/30403#discussion_r1501138097


##
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java:
##
@@ -143,6 +143,9 @@
  */
 public abstract class PTransform
 implements Serializable /* See the note above */, HasDisplayData {
+
+  private static final long serialVersionUID = 3383862966597863311L;

Review Comment:
   The problem is that Flink needs to serialize the UnboundedSource (after 
split) into state to support legacy Read. I'm not sure what is the correct path 
of fixing this - we could use Kryo, but UnboundedSource actually is 
`Serializable`. How does Dataflow serialize the source? I suppose the same 
applies for ReadViaSDF as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [YAML] Fix MapToFields error output type inference [beam]

2024-02-23 Thread via GitHub


robertwb merged PR #30378:
URL: https://github.com/apache/beam/pull/30378


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


robertwb commented on code in PR #30403:
URL: https://github.com/apache/beam/pull/30403#discussion_r1501133515


##
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java:
##
@@ -143,6 +143,9 @@
  */
 public abstract class PTransform
 implements Serializable /* See the note above */, HasDisplayData {
+
+  private static final long serialVersionUID = 3383862966597863311L;

Review Comment:
   Ack. Hopefully we can clean this up in the future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


je-ik commented on PR #30403:
URL: https://github.com/apache/beam/pull/30403#issuecomment-1961911653

   Makes sense. I removed the sdk.util version. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Ensure flatten windows match [beam]

2024-02-23 Thread via GitHub


damccorm opened a new pull request, #30410:
URL: https://github.com/apache/beam/pull/30410

   When these windows don't match, it can create unexpected behavior 
downstream; it doesn't really make sense to flatten different windows.
   
   Fixes #22903
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Skip deleting empty file in case of no shards and skip_if_empty [beam]

2024-02-23 Thread via GitHub


riteshghorse opened a new pull request, #30409:
URL: https://github.com/apache/beam/pull/30409

   Fixes #27926 
   
   When skip_if_empty is set to true and no shards are present, don't delete 
the existing the files.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implementing lull reporting at bundle level processing [beam]

2024-02-23 Thread via GitHub


arvindram03 commented on PR #29882:
URL: https://github.com/apache/beam/pull/29882#issuecomment-1961885912

   The tests are green and ready to be merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implementing lull reporting at bundle level processing [beam]

2024-02-23 Thread via GitHub


arvindram03 closed pull request #29882: Implementing lull reporting at bundle 
level processing
URL: https://github.com/apache/beam/pull/29882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]

2024-02-23 Thread via GitHub


tvalentyn merged PR #30408:
URL: https://github.com/apache/beam/pull/30408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30408:
URL: https://github.com/apache/beam/pull/30408#issuecomment-1961792250

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]

2024-02-23 Thread via GitHub


tvalentyn commented on PR #30408:
URL: https://github.com/apache/beam/pull/30408#issuecomment-1961790669

   R: @svetakvsundhar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Implementing lull reporting at bundle level processing [beam]

2024-02-23 Thread via GitHub


arvindram03 opened a new pull request, #29882:
URL: https://github.com/apache/beam/pull/29882

   Implementing lull reporting for dataflow worker at bundle level processing. 
We dump a stack trace when the bundle processing time exceeds 10 mins. As part 
of this, we log the step names and time spent in each step to help users debug 
stuck jobs better. 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix invalid escape sequence '\#' [beam]

2024-02-23 Thread via GitHub


mitchej123 commented on PR #30393:
URL: https://github.com/apache/beam/pull/30393#issuecomment-1961784671

   Run preCommitPyCoverage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implementing lull reporting at bundle level processing [beam]

2024-02-23 Thread via GitHub


arvindram03 closed pull request #29882: Implementing lull reporting at bundle 
level processing
URL: https://github.com/apache/beam/pull/29882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug]: Frames to Beam playground no longer render on Beam website. [beam]

2024-02-23 Thread via GitHub


tvalentyn commented on issue #30394:
URL: https://github.com/apache/beam/issues/30394#issuecomment-1961763069

   Rootcause of behavior change: 
https://github.com/apache/infrastructure-p6/commit/361cb2462de24d8699c807f42a03f9f9c38c


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Remove the CSP-blocked iframes pointing outside of apache.org [beam]

2024-02-23 Thread via GitHub


tvalentyn opened a new pull request, #30408:
URL: https://github.com/apache/beam/pull/30408

   Staged URL: TBD


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30406:
URL: https://github.com/apache/beam/pull/30406#issuecomment-1961751506

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]

2024-02-23 Thread via GitHub


Abacn commented on PR #30406:
URL: https://github.com/apache/beam/pull/30406#issuecomment-1961749902

   R: @damondouglas @damccorm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create PubSubIO Load test [beam]

2024-02-23 Thread via GitHub


Abacn merged PR #30286:
URL: https://github.com/apache/beam/pull/30286


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create PubSubIO Load test [beam]

2024-02-23 Thread via GitHub


Abacn commented on code in PR #30286:
URL: https://github.com/apache/beam/pull/30286#discussion_r1500991496


##
it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java:
##
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.Timestamp;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** PubSubIO performance tests. */
+public class PubSubIOLT extends IOLoadTestBase {
+
+  private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10;
+  private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20;
+  private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+  private static final String MAP_RECORDS_STEP_NAME = "Map records";
+  private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub";
+  private static final Map TEST_CONFIGS_PRESET;
+  private static TopicName topicName;
+  private static String testConfigName;
+  private static Configuration configuration;
+  private static SubscriptionName subscription;
+  private static InfluxDBSettings influxDBSettings;
+  private static PubsubResourceManager resourceManager;
+
+  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+
+  static {
+try {
+  TEST_CONFIGS_PRESET =
+  ImmutableMap.of(
+  "local",
+  PubSubIOLT.Configuration.fromJsonString(
+  
"{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}",
+  PubSubIOLT.Configuration.class), // 0.2 MB
+  "medium",
+  PubSubIOLT.Configuration.fromJsonString(
+  

Re: [PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


tvalentyn commented on PR #30407:
URL: https://github.com/apache/beam/pull/30407#issuecomment-1961745638

   actually removing these frames may be a more appropriate course of action.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


tvalentyn closed pull request #30407: Update .htaccess
URL: https://github.com/apache/beam/pull/30407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


svetakvsundhar commented on PR #30407:
URL: https://github.com/apache/beam/pull/30407#issuecomment-1961733250

   Ah, I think its addressing the below; sounds good. 
   https://github.com/apache/beam/assets/26037657/c0052f9f-7b2d-45ff-9f43-3b7168192d56;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


svetakvsundhar commented on PR #30407:
URL: https://github.com/apache/beam/pull/30407#issuecomment-1961729904

   Not sure I fully understand, which page is this change for? Also, could you 
link the staging website? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use BeamModulePlugin org.json version in extensions/ml [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30406:
URL: https://github.com/apache/beam/pull/30406#issuecomment-1961728571

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @bvolpato for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30407:
URL: https://github.com/apache/beam/pull/30407#issuecomment-1961714536

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


tvalentyn commented on PR #30407:
URL: https://github.com/apache/beam/pull/30407#issuecomment-1961710980

   R: @svetakvsundhar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update .htaccess [beam]

2024-02-23 Thread via GitHub


tvalentyn opened a new pull request, #30407:
URL: https://github.com/apache/beam/pull/30407

   Also allowlist google calendar.
   
   follow up for #30394


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add DirectCommitWorkStream for direct path [beam]

2024-02-23 Thread via GitHub


scwhittle commented on code in PR #30255:
URL: https://github.com/apache/beam/pull/30255#discussion_r1500931469


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##
@@ -77,14 +79,19 @@ interface CommitWorkStream extends WindmillStream {
  * onDone will be called with the status of the commit.
  */
 boolean commitWorkItem(
-String computation,
-Windmill.WorkItemCommitRequest request,
-Consumer onDone);
+String computation, WorkItemCommitRequest request, 
Consumer onDone);
 
 /** Flushes any pending work items to the wire. */
 void flush();
   }
 
+  @ThreadSafe
+  interface AsyncCommitWorkStream extends CommitWorkStream {
+void queueCommit(Commit commit);

Review Comment:
   add comments to methods
   
   in particular, is this required not to block?



##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectCommitWorkStream.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.streaming.Commit;
+import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
+import org.apache.beam.runners.dataflow.worker.windmill.client.CompleteCommit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.AsyncCommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link
+ * 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream}
 that
+ * manages its own commit queue, and asynchronously CommitWork RPCs to 
Streaming Engine as callers
+ * queue commits on the internal queue.
+ *
+ * Callers should call {@link #queueCommit(Commit)} when work is ready to 
be committed.
+ */
+public class GrpcDirectCommitWorkStream extends GrpcCommitWorkStream

Review Comment:
   could we use composition instead of inheritence? it seems like functionality 
is just added on top of the existing stream and could just be a separate object 
containing a stream it delegates to.
   
   
https://en.wikipedia.org/wiki/Composition_over_inheritance#:~:text=Composition%20over%20inheritance%20(or%20composite,from%20a%20base%20or%20parent
   
   It seems like https://github.com/apache/beam/pull/30312 is ending up there, 
with a committer object that is wrapping the stream. Perhaps we should just get 
that one in first and then integrate that object for direct path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] start to refactor persistence layer to prepare for direct path [beam]

2024-02-23 Thread via GitHub


scwhittle commented on code in PR #30265:
URL: https://github.com/apache/beam/pull/30265#discussion_r1500946427


##
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##
@@ -462,42 +446,89 @@ public void run() {
 LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
   }
 
-  private static WindmillServerStub createWindmillServerStub(
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+ConcurrentMap computationMap = new 
ConcurrentHashMap<>();
+long clientId = clientIdGenerator.nextLong();
+return new StreamingDataflowWorker(
+createWindmillServerStub(
+options,
+clientId,
+new WorkHeartbeatResponseProcessor(

Review Comment:
   I don't see where this is setup if the forTesting method is used.
   
   Are you sure this isn't the cause of the test failure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implementing lull reporting at bundle level processing [beam]

2024-02-23 Thread via GitHub


scwhittle commented on PR #29882:
URL: https://github.com/apache/beam/pull/29882#issuecomment-1961654189

   Just made final edit (replace anyOf with allOf instead of separate allOf) 
instead of round-trip.  Will merge once tests pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Use BeamModulePlugin org.json version in extensions/ml [beam]

2024-02-23 Thread via GitHub


Abacn opened a new pull request, #30406:
URL: https://github.com/apache/beam/pull/30406

   Fixes #30404
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update description.md [beam]

2024-02-23 Thread via GitHub


lostluck merged PR #30401:
URL: https://github.com/apache/beam/pull/30401


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [I] [Task]: Use GCP-BOM to manage google cloud dependencies in sdks/java/extensions/ml [beam]

2024-02-23 Thread via GitHub


Abacn commented on issue #30405:
URL: https://github.com/apache/beam/issues/30405#issuecomment-1961617421

   There is at least a known breaking change from 
google-cloud-recommendations-ai needs to be resolved: 
   
   
https://github.com/googleapis/google-cloud-java/commit/7c866001585563cd673b0bb33a567fd549cc483d
 changed return type of predictionServiceClient.predict.iterateAll from 
PredictionResult to Entry


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. I find a better way to avoid the decoding overhead in the fake client 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500878413


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   Sounds good! Thanks a lot for the input, Kenn.
   
   I will go ahead with the changes to re-use Get/Append/Clear for ordered list 
then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design doc to 
summarize the decisions we make here. We should have that after this round of 
review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design doc to 
summarize the decisions we make here. We should have that after the this round 
of review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design doc to 
summarize the decisions we make here after the this round of review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design to summarize 
the decision we make here after the this round of review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


kennknowles commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500851162


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   OK my responses are evolving now that I've read the whole code change and 
re-read the doc.
   
- I see specialized requests are broken out anyhow, so that's fine.
- Including everything needed for caching in the state key is good for raw 
request caching, so re-using Get is good. Though there are perhaps smarter ways 
to cache that won't benefit from this
- "append" is still a fine method for adding things to ordered list state, 
but it isn't important and the name is misleading (as it is for bags and 
multimaps, since they are not ordered, so anyhow it is the same here and might 
as well keep the incorrect naming)
- Obviously `clear` is fine



##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   It would be helpful to outline the pro/con in the design doc of little 
decisions like, and note which one was chosen and why.
   
   For example one benefit to splitting the requests is to avoid ordering 
issues. We would have to spec that either the inserts or deletes happen first, 
even though they are in one request together. It is a bit confusing. And then 
if you want them in the other order, you still have to make two requests but 
each one has an empty field.
   
   And note whether there is an efficiency consideration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


kennknowles commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500844153


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   The one counterpoint is that a state key is used for caching, as long as the 
spec is that a state key is deterministic in what it returns. So it has value 
for `get`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


kennknowles commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500839373


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   I still agree with my comment there: "it looks like a 'do everything' 
request/response protocol and the key holds all the information." That comment 
may seem to be about style, but it is really about transparency, debuggability, 
readability.
   
   I view whatever "consistency" we have amongst the state requests as 
coincidence at best and a mistake at worst. _The_ reason that different kinds 
of state exist is because they support different methods. There is a reason 
that the only method they have in common in Java (where it originated) is 
`clear()`. Apparently the design decision was to model `methodA, methodB, 
methodC` as `oneBigMethod("actually do A"), oneBigMethod("actually do B"), 
oneBigMethod("actually do C")`. I just think that is not as good as being 
straightforward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Duet AI Prompts: Apache Beam Runners [beam]

2024-02-23 Thread via GitHub


damccorm merged PR #30346:
URL: https://github.com/apache/beam/pull/30346


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow frame references to Beam Playground. [beam]

2024-02-23 Thread via GitHub


svetakvsundhar commented on PR #30391:
URL: https://github.com/apache/beam/pull/30391#issuecomment-1961295491

   looks like it didnt fix the issue: 
https://beam.apache.org/documentation/transforms/java/elementwise/pardo/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create PubSubIO Load test [beam]

2024-02-23 Thread via GitHub


akashorabek commented on code in PR #30286:
URL: https://github.com/apache/beam/pull/30286#discussion_r1500616246


##
it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java:
##
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.Timestamp;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** PubSubIO performance tests. */
+public class PubSubIOLT extends IOLoadTestBase {
+
+  private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10;
+  private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20;
+  private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+  private static final String MAP_RECORDS_STEP_NAME = "Map records";
+  private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub";
+  private static final Map TEST_CONFIGS_PRESET;
+  private static TopicName topicName;
+  private static String testConfigName;
+  private static Configuration configuration;
+  private static SubscriptionName subscription;
+  private static InfluxDBSettings influxDBSettings;
+  private static PubsubResourceManager resourceManager;
+
+  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+
+  static {
+try {
+  TEST_CONFIGS_PRESET =
+  ImmutableMap.of(
+  "local",
+  PubSubIOLT.Configuration.fromJsonString(
+  
"{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}",
+  PubSubIOLT.Configuration.class), // 0.2 MB
+  "medium",
+  PubSubIOLT.Configuration.fromJsonString(
+  

Re: [PR] Create PubSubIO Load test [beam]

2024-02-23 Thread via GitHub


akashorabek commented on code in PR #30286:
URL: https://github.com/apache/beam/pull/30286#discussion_r1500616246


##
it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubSubIOLT.java:
##
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.Timestamp;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** PubSubIO performance tests. */
+public class PubSubIOLT extends IOLoadTestBase {
+
+  private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10;
+  private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20;
+  private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+  private static final String MAP_RECORDS_STEP_NAME = "Map records";
+  private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub";
+  private static final Map TEST_CONFIGS_PRESET;
+  private static TopicName topicName;
+  private static String testConfigName;
+  private static Configuration configuration;
+  private static SubscriptionName subscription;
+  private static InfluxDBSettings influxDBSettings;
+  private static PubsubResourceManager resourceManager;
+
+  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+
+  static {
+try {
+  TEST_CONFIGS_PRESET =
+  ImmutableMap.of(
+  "local",
+  PubSubIOLT.Configuration.fromJsonString(
+  
"{\"numRecords\":200,\"valueSizeBytes\":1000,\"pipelineTimeout\":7,\"runner\":\"DirectRunner\",\"numWorkers\":1}",
+  PubSubIOLT.Configuration.class), // 0.2 MB
+  "medium",
+  PubSubIOLT.Configuration.fromJsonString(
+  

Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


je-ik commented on PR #30403:
URL: https://github.com/apache/beam/pull/30403#issuecomment-1961089654

   I was able to upgrade from Beam 2.54.0 to 2.55.0-SNAPSHOT with this patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


je-ik commented on code in PR #30403:
URL: https://github.com/apache/beam/pull/30403#discussion_r1500481060


##
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java:
##
@@ -143,6 +143,9 @@
  */
 public abstract class PTransform
 implements Serializable /* See the note above */, HasDisplayData {
+
+  private static final long serialVersionUID = 3383862966597863311L;

Review Comment:
   Unfortunately Flink uses Java serialization to serialize checkpoints (via 
SerializableCoder) in legacy Read transform (needed for KafkaIO), so without 
stabilizing this we are unable to upgrade even with the 
SerializablePipelineOptions moved back. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30403:
URL: https://github.com/apache/beam/pull/30403#issuecomment-1961088490

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [flink] #30402 restore upgradability [beam]

2024-02-23 Thread via GitHub


je-ik commented on PR #30403:
URL: https://github.com/apache/beam/pull/30403#issuecomment-1961085713

   R: @JozoVilcek @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] 30402 flink upgrade [beam]

2024-02-23 Thread via GitHub


je-ik opened a new pull request, #30403:
URL: https://github.com/apache/beam/pull/30403

   Closes #30402 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Duet AI Prompts: Apache Beam Runners [beam]

2024-02-23 Thread via GitHub


dariabezkorovaina commented on PR #30346:
URL: https://github.com/apache/beam/pull/30346#issuecomment-1961077682

   @damccorm thanks very much:) Implemented the changes, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update description.md [beam]

2024-02-23 Thread via GitHub


github-actions[bot] commented on PR #30401:
URL: https://github.com/apache/beam/pull/30401#issuecomment-1960998086

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @lostluck added as fallback since no labels match configuration
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update description.md [beam]

2024-02-23 Thread via GitHub


rtarway opened a new pull request, #30401:
URL: https://github.com/apache/beam/pull/30401

   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   

   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org