[beam] branch master updated (c3a2aaca750 -> ad729f762ef)

2023-01-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from c3a2aaca750 Bump github.com/containerd/containerd from 1.6.8 to 1.6.12 
in /sdks (#24945)
 add ad729f762ef [Playground] Add an option for Datastore namespace value 
for ci_cd.py script (#24818)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/playground_deploy_examples.yml   | 12 ++---
 .../workflows/playground_examples_cd_reusable.yml  |  1 +
 .../workflows/playground_examples_ci_reusable.yml  |  1 +
 playground/README.md   | 54 ++
 playground/infrastructure/ci_cd.py | 18 ++--
 playground/infrastructure/config.py|  2 +-
 playground/infrastructure/datastore_client.py  |  8 +---
 playground/infrastructure/test_ci_cd.py|  3 +-
 playground/infrastructure/test_datastore_client.py | 29 
 9 files changed, 72 insertions(+), 56 deletions(-)



[beam] branch master updated: Multifile examples on frontend (#24859) (#24865)

2023-01-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 9ec68dd3df1 Multifile examples on frontend (#24859) (#24865)
9ec68dd3df1 is described below

commit 9ec68dd3df1fb8e602bafe055031b71a39ccc41d
Author: alexeyinkin 
AuthorDate: Wed Jan 11 23:17:48 2023 +0400

Multifile examples on frontend (#24859) (#24865)

* Multifile examples on frontend (#24859)

* Make a wrapper local (#24859)

* Revert sdk.g.dart deletion, minor fixes (#24859)

* Fix after review (#24859)
---
 playground/frontend/lib/l10n/app_en.arb|   2 +-
 .../example_list/example_item_actions.dart |  50 +++---
 .../examples/components/multi_file_icon.dart}  |  30 ++--
 .../multifile_popover/multifile_popover.dart   |  68 ---
 .../multifile_popover_button.dart  |  95 --
 .../widgets/editor_textarea_wrapper.dart   |  91 --
 .../widgets/playground_page_body.dart  |   2 +-
 .../lib/playground_components.dart |   1 +
 .../lib/src/cache/example_cache.dart   |  23 +--
 .../lib/src/constants/sizes.dart   |   1 +
 .../example_loaders/content_example_loader.dart|   2 +-
 .../example_loaders/http_example_loader.dart   |   5 +-
 .../lib/src/controllers/playground_controller.dart |  50 +++---
 .../controllers/snippet_editing_controller.dart| 199 ++---
 .../snippet_file_editing_controller.dart   | 151 
 .../lib/src/models/example.dart|  11 +-
 .../content_example_loading_descriptor.dart|  18 +-
 .../lib/src/models/example_view_options.dart   |   8 +-
 .../shared_file.dart => models/snippet_file.dart}  |  31 +++-
 .../lib/src/models/snippet_file.g.dart |  20 +++
 .../repositories/code_client/grpc_code_client.dart |   7 +-
 .../example_client/grpc_example_client.dart|  34 ++--
 .../lib/src/repositories/example_repository.dart   |   5 +-
 .../get_precompiled_object_code_response.dart  |   6 +-
 .../repositories/models/get_snippet_response.dart  |   4 +-
 .../src/repositories/models/run_code_request.dart  |   9 +-
 .../repositories/models/save_snippet_request.dart  |   4 +-
 ...ponse.dart => snippet_file_grpc_extension.dart} |  29 +--
 .../lib/src/widgets/complexity.dart|   5 +-
 .../lib/src/widgets/run_or_cancel_button.dart  |   1 -
 .../lib/src/widgets/snippet_editor.dart| 153 ++--
 ...nippet_editor.dart => snippet_file_editor.dart} |  37 ++--
 .../lib/src/widgets/tab_header.dart|   4 +-
 .../lib/src/widgets/tabbed_snippet_editor.dart |  81 +
 .../widgets/{tab_header.dart => tabs/tab_bar.dart} |  27 ++-
 .../frontend/playground_components/pubspec.yaml|   3 +-
 .../test/src/cache/example_cache_test.dart |  41 +++--
 .../test/src/common/categories.dart|  14 +-
 .../test/src/common/descriptors.dart   |  12 +-
 .../test/src/common/example_cache.mocks.dart   |   5 +-
 .../test/src/common/example_repository_mock.dart   |  17 +-
 .../src/common/example_repository_mock.mocks.dart  |  27 +--
 .../test/src/common/examples.dart  |  42 +++--
 .../test/src/common/requests.dart  |   5 +-
 .../src/controllers/example_loaders/common.dart|   2 +-
 .../examples_loader_test.mocks.dart|  77 +++-
 .../example_loaders/http_example_loader_test.dart  |   2 +-
 .../http_example_loader_test.mocks.dart|   5 +-
 .../controllers/playground_controller_test.dart|  41 +++--
 .../playground_controller_test.mocks.dart  |   5 +-
 .../snippet_editing_controller_test.dart   |  93 +++---
 .../content_example_loading_descriptor_test.dart   |   9 +-
 .../src/repositories/code_repository_test.dart |   3 +-
 .../src/repositories/example_repository_test.dart  |  25 ++-
 .../playground_components_dev/pubspec.yaml |   2 +-
 playground/frontend/pubspec.lock   |   9 +-
 playground/frontend/pubspec.yaml   |   2 +-
 .../messages/models/set_content_message_test.dart  |  35 ++--
 58 files changed, 918 insertions(+), 822 deletions(-)

diff --git a/playground/frontend/lib/l10n/app_en.arb 
b/playground/frontend/lib/l10n/app_en.arb
index 538df437994..a49ba9ebc2c 100644
--- a/playground/frontend/lib/l10n/app_en.arb
+++ b/playground/frontend/lib/l10n/app_en.arb
@@ -195,7 +195,7 @@
   "@exampleDescription": {
 "description": "Description icon label"
   },
-  "exampleMultifile": "Multifile example info",
+  "exampleMultifile": "Multifile",
   "@exampleDescription": {
 "exampleMultifile": "

[beam] branch master updated (9ec68dd3df1 -> 6de4d5fa8d0)

2023-01-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 9ec68dd3df1 Multifile examples on frontend (#24859) (#24865)
 add 6de4d5fa8d0 Return only Playground examples in GetCatalog() (#24816)

No new revisions were added by this update.

Summary of changes:
 .../backend/internal/db/datastore/datastore_db.go  |  2 +-
 .../internal/db/datastore/datastore_db_test.go | 30 +++---
 2 files changed, 22 insertions(+), 10 deletions(-)



[beam] branch master updated: [Website] add credit karma case-study card (#24970)

2023-01-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e21b5b79a25 [Website] add credit karma case-study card (#24970)
e21b5b79a25 is described below

commit e21b5b79a25dec389a87c3d638219d09e2c9
Author: bullet03 
AuthorDate: Thu Jan 12 05:51:19 2023 +0600

[Website] add credit karma case-study card (#24970)

* [Website] add credit karma case-study card

* Case study updates

* Case study updates

* add link

Co-authored-by: Alex Kosolapov 
---
 website/www/site/assets/scss/_case_study.scss  |  13 ++
 .../site/content/en/case-studies/creditKarma.md| 256 +
 website/www/site/data/en/quotes.yaml   |   7 +-
 .../case-study/credit_karma/avneesh_pratap.jpeg| Bin 0 -> 58754 bytes
 .../case-study/credit_karma/raj_katakam.jpeg   | Bin 0 -> 55475 bytes
 .../images/case-study/credit_karma/scheme-2.png| Bin 0 -> 33824 bytes
 .../images/case-study/credit_karma/scheme-3.png| Bin 0 -> 20122 bytes
 .../images/case-study/credit_karma/scheme-4.png| Bin 0 -> 17228 bytes
 .../images/case-study/credit_karma/scheme-5.png| Bin 0 -> 38671 bytes
 .../images/logos/powered-by/credit-karma.png   | Bin 0 -> 17972 bytes
 10 files changed, 275 insertions(+), 1 deletion(-)

diff --git a/website/www/site/assets/scss/_case_study.scss 
b/website/www/site/assets/scss/_case_study.scss
index 72c7ce3d94a..b1f42e5a35c 100644
--- a/website/www/site/assets/scss/_case_study.scss
+++ b/website/www/site/assets/scss/_case_study.scss
@@ -125,6 +125,7 @@
 
   .case-study-card-img img {
 height: 50px;
+object-fit: scale-down;
 @media (min-width: $mobile) and (max-width: $tablet) {
   object-fit: contain;
 }
@@ -362,6 +363,9 @@ h2.case-study-h2 {
 }
   }
 }
+.pb-0 {
+  padding-bottom: 30px;
+}
   }
 
   .case-study-post {
@@ -386,6 +390,15 @@ h2.case-study-h2 {
 }
   }
 
+  .post-scheme--centered {
+margin-left: auto;
+margin-right: auto;
+
+img {
+  width: 70%;
+}
+  }
+
   @media screen and (max-width: $mobile) {
 .case-study-content {
   flex-direction: column;
diff --git a/website/www/site/content/en/case-studies/creditKarma.md 
b/website/www/site/content/en/case-studies/creditKarma.md
new file mode 100644
index 000..d6fb4ddc1cb
--- /dev/null
+++ b/website/www/site/content/en/case-studies/creditKarma.md
@@ -0,0 +1,256 @@
+---
+title: "Self-service Machine Learning Workflows and Scaling MLOps with Apache 
Beam"
+name: "Credit Karma"
+icon: "/images/logos/powered-by/credit-karma.png"
+category: "study"
+cardTitle: "Self-service Machine Learning Workflows and Scaling MLOps with 
Apache Beam"
+cardDescription: "Apache Beam has future-proofed Credit Karma’s data and ML 
platform for scalability and efficiency, enabling MLOps with unified pipelines, 
processing 5-10 TB daily at 5K events per second, and managing 20K+ ML 
features."
+authorName: "Avneesh Pratap"
+coauthorName: "Raj Katakam"
+authorPosition: "Senior Data Engineer II @ Credit Karma"
+coauthorPosition: "Senior ML Engineer II @ Credit Karma"
+authorImg: /images/case-study/credit_karma/avneesh_pratap.jpeg
+coauthorImg: /images/case-study/credit_karma/raj_katakam.jpeg
+publishDate: 2022-12-01T00:12:00+00:00
+---
+
+
+
+
+
+
+  
+“Apache Beam has been the ideal solution for us. Scaling, backfilling 
historical data, experimenting with new ML models and new use cases… it is all 
very easy to do with Beam.”
+  
+  
+
+
+
+
+
+  Avneesh Pratap
+
+
+  Senior Data Engineer II @ Credit Karma
+
+
+  
+
+
+  
+“Apache Beam enabled self-service ML for our data scientists. They can 
plug in pieces of code, and those transformations will be automatically 
attached to models without any engineering involvement. Within seconds, our 
data science team can move from experimentation to production.”
+  
+  
+
+
+
+
+
+  Raj Katakam
+
+
+  Senior ML Engineer II @ Credit Karma
+
+
+  
+
+
+
+
+# Self-service Machine Learning Workflows and Scaling MLOps with Apache Beam
+
+## Background
+
+[Credit Karma](https://www.creditkarma.com/) is an American multinational 
personal finance company [founded in 
2007](https://en.wikipedia.org/wiki/Credit_Karma), now part of 
[Intuit](/case-studies/intuit/). With a free credit and financial management 
platform, Credit Karma enables financial progress for nearly 

[beam] branch master updated: enabling public construction of typed schematransforms (#25010)

2023-01-17 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d9d322b5697 enabling public construction of typed schematransforms 
(#25010)
d9d322b5697 is described below

commit d9d322b569784fd9a7d6eb480b2e85368f88bdbb
Author: Pablo Estrada 
AuthorDate: Wed Jan 18 01:01:38 2023 -0500

enabling public construction of typed schematransforms (#25010)
---
 .../beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java | 2 +-
 .../beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java  | 2 +-
 .../sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
index eced2405eb2..ce4548eceb4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
@@ -126,7 +126,7 @@ public abstract class 
PubsubWriteSchemaTransformConfiguration {
  */
 public abstract Builder setIdAttribute(String value);
 
-abstract PubsubWriteSchemaTransformConfiguration build();
+public abstract PubsubWriteSchemaTransformConfiguration build();
   }
 
   @DefaultSchema(AutoValueSchema.class)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
index cc68cc7772d..3a6b2389b92 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
@@ -97,7 +97,7 @@ public class PubsubWriteSchemaTransformProvider
 
   /** Returns the expected {@link SchemaTransform} of the configuration. */
   @Override
-  protected SchemaTransform from(PubsubWriteSchemaTransformConfiguration 
configuration) {
+  public SchemaTransform from(PubsubWriteSchemaTransformConfiguration 
configuration) {
 return new PubsubWriteSchemaTransform(configuration);
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
index eea8070e6a0..c9922826ace 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
@@ -59,7 +59,7 @@ public class PubsubLiteWriteSchemaTransformProvider
   }
 
   @Override
-  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
   PubsubLiteWriteSchemaTransformConfiguration configuration) {
 
 if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {



[beam] branch master updated (95874a70e70 -> 187ba474fdb)

2023-01-18 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 95874a70e70 Remove trailing whitespace from blog to fix precommit 
(#25055)
 add 187ba474fdb improve some errors from DoFnSignature (#25001)

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/transforms/reflect/DoFnSignatures.java  | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)



[beam] branch master updated: [Website] update content of community case study page (#25023)

2023-01-19 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b42bda1d06 [Website] update content of community case study page 
(#25023)
6b42bda1d06 is described below

commit 6b42bda1d062899f6c5571aaac4df2dc26fe77cd
Author: bullet03 
AuthorDate: Fri Jan 20 04:07:11 2023 +0600

[Website] update content of community case study page (#25023)
---
 website/www/site/content/en/community/case-study.md | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/website/www/site/content/en/community/case-study.md 
b/website/www/site/content/en/community/case-study.md
index 3b14932a46a..c731abf744d 100644
--- a/website/www/site/content/en/community/case-study.md
+++ b/website/www/site/content/en/community/case-study.md
@@ -25,10 +25,7 @@ Your company uses Apache Beam? There are several options to 
highlight it and mak
 
 ## Share your story
 
-Sharing your story and making a Case Study out of it might have a significant 
impact on helping users with adopting
-Apache Beam. And, of course, gaining interest for your company.
-mailto:d...@beam.apache.org?subject=Beam Website Add New Case 
Stud">Contact the community to get Case Study
-started!
+Sharing your story and making a Case Study out of it might have a significant 
impact on helping users with adopting Apache Beam. And, of course, gaining 
interest for your company. Please see [how to add a new case 
study](https://github.com/apache/beam/blob/master/website/ADD_CASE_STUDY.md) 
for the next steps, templates and helpful tips. If you have any questions about 
adding a case study, please [contact the 
community](mailto:d...@beam.apache.org?subject=Beam%20Website%20Add%20New%20Case%
 [...]
 
 ## Add your logo
 



[beam] branch master updated: Add GraalSystems (#23651)

2023-01-23 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 633c51242e9 Add GraalSystems (#23651)
633c51242e9 is described below

commit 633c51242e9b135059eb24da34fc896e30faa2a0
Author: Vincent Devillers 
AuthorDate: Mon Jan 23 22:11:00 2023 +0100

Add GraalSystems (#23651)

Co-authored-by: Vincent Devillers <>
---
 .../site/content/en/case-studies/graalsystems.md   |  21 +
 .../images/logos/powered-by/graalsystems.png   | Bin 0 -> 11402 bytes
 2 files changed, 21 insertions(+)

diff --git a/website/www/site/content/en/case-studies/graalsystems.md 
b/website/www/site/content/en/case-studies/graalsystems.md
new file mode 100755
index 000..97762cc08b3
--- /dev/null
+++ b/website/www/site/content/en/case-studies/graalsystems.md
@@ -0,0 +1,21 @@
+---
+title:  "GraalSystems"
+icon: /images/logos/powered-by/graalsystems.png
+hasNav: true
+cardDescription: "GraalSystems is a cloud native data platform providing 
support for Beam, Spark, Tensorflow, Samza and many other data processing 
solutions. At the heart of our architecture are a set of distributed processing 
and analytics modules using Beam to route over 2 billion events per day from 
our Apache Pulsar clusters. For our clients, we run also more than 2,000 Beam 
jobs per day at a very large scale in our production platform."
+---
+
+
+**[GraalSystems](https://graal.systems)** is a cloud native data platform 
providing support for Beam, Spark, Tensorflow, Samza and many other data 
processing solutions. At the heart of our architecture are a set of distributed 
processing and analytics modules using Beam to route over 2 billion events per 
day from our Apache Pulsar clusters. For our clients, we run also more than 
2,000 Beam jobs per day at a very large scale in our production platform
\ No newline at end of file
diff --git a/website/www/site/static/images/logos/powered-by/graalsystems.png 
b/website/www/site/static/images/logos/powered-by/graalsystems.png
new file mode 100755
index 000..e06649a2ae3
Binary files /dev/null and 
b/website/www/site/static/images/logos/powered-by/graalsystems.png differ



[beam] branch master updated: Adding autoservice annotations to all SchemaTransformProviders (#25107)

2023-01-24 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 9aa2c529c51 Adding autoservice annotations to all 
SchemaTransformProviders (#25107)
9aa2c529c51 is described below

commit 9aa2c529c51670a4d7221649ccd8896ddf17be6b
Author: Pablo Estrada 
AuthorDate: Tue Jan 24 13:11:59 2023 -0500

Adding autoservice annotations to all SchemaTransformProviders (#25107)

* Adding autoservice annotations to all SchemaTransformProviders

* spotless
---
 .../apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java   | 3 +++
 .../beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java  | 3 +++
 .../beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java | 2 ++
 .../sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java   | 3 +++
 .../sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java  | 3 +++
 .../apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java| 3 +++
 .../schematransform/SingleStoreSchemaTransformReadProvider.java| 3 +++
 .../schematransform/SingleStoreSchemaTransformWriteProvider.java   | 3 +++
 8 files changed, 23 insertions(+)

diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
index 1d3cb8a97dd..154cc8a16e4 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.io.debezium;
 
+import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Arrays;
 import java.util.Collection;
@@ -27,6 +28,7 @@ import java.util.stream.Collectors;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
@@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory;
  * This transform is tested against MySQL and Postgres, but it 
should work well for any
  * data source supported by Debezium.
  */
+@AutoService(SchemaTransformProvider.class)
 public class DebeziumReadSchemaTransformProvider
 extends TypedSchemaTransformProvider<
 
DebeziumReadSchemaTransformProvider.DebeziumReadSchemaTransformConfiguration> {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
index c4399f1c21d..de092a5e601 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
 import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
 
 import com.google.api.client.util.Clock;
+import com.google.auto.service.AutoService;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +31,7 @@ import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
@@ -49,6 +51,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 })
 @Internal
 @Experimental(Kind.SCHEMAS)
+@AutoService(SchemaTransformProvider.class)
 public class PubsubSchemaTransformReadProvider
 extends 
TypedSchemaTransformProvider {
   private static final String API = "pubsub";
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
index 3a6b2389b92..42553dfab94 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/ap

[beam] branch master updated: [BEAM-12164] Enable postgres IT for change streams (#25152)

2023-01-24 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new a96afe2c57c [BEAM-12164] Enable postgres IT for change streams (#25152)
a96afe2c57c is described below

commit a96afe2c57c45a869a622086eaa4f81305f06e72
Author: nancyxu123 
AuthorDate: Tue Jan 24 15:03:23 2023 -0800

[BEAM-12164] Enable postgres IT for change streams (#25152)

* Reenable postgres integration test

* Update SpannerChangeStreamPostgresIT.java

Co-authored-by: Nancy Xu 
---
 .../io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java
index 82e7c72079e..f5e37325bb3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java
@@ -49,7 +49,6 @@ import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -91,7 +90,6 @@ public class SpannerChangeStreamPostgresIT {
 
pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
   }
 
-  @Ignore("BEAM-14277 Until Postgres is supported")
   @Test
   public void testReadSpannerChangeStream() {
 // Defines how many rows are going to be inserted / updated / deleted in 
the test



[beam] branch master updated: Initial commit of boilerplate setup of change stream pipeline for bigtable

2023-01-26 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 67b5b368353 Initial commit of boilerplate setup of change stream 
pipeline for bigtable
 new f565a94b9b3 Merge pull request #25153 from tonytanger/initial_cdc
67b5b368353 is described below

commit 67b5b3683537fad47feecaf12f8180eac2621473
Author: Tony Tang 
AuthorDate: Tue Jan 24 14:22:41 2023 -0500

Initial commit of boilerplate setup of change stream pipeline for bigtable
---
 .../beam/sdk/io/gcp/bigtable/BigtableConfig.java   |  38 ++-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   | 355 +
 .../changestreams/ByteStringRangeHelper.java   |  39 +++
 .../changestreams/ChangeStreamMetrics.java |  25 ++
 .../changestreams/ChangeStreamMutation.java|  24 ++
 .../bigtable/changestreams/TimestampConverter.java |  37 +++
 .../bigtable/changestreams/UniqueIdGenerator.java  |  38 +++
 .../changestreams/action/ActionFactory.java| 121 +++
 .../changestreams/action/ChangeStreamAction.java   | 105 ++
 .../action/DetectNewPartitionsAction.java  | 118 +++
 .../action/GenerateInitialPartitionsAction.java|  58 
 .../action/ReadChangeStreamPartitionAction.java| 123 +++
 .../changestreams/action/package-info.java |  22 ++
 .../changestreams/dao/ChangeStreamDao.java |  33 ++
 .../gcp/bigtable/changestreams/dao/DaoFactory.java | 113 +++
 .../changestreams/dao/MetadataTableAdminDao.java   |  82 +
 .../changestreams/dao/MetadataTableDao.java|  77 +
 .../bigtable/changestreams/dao/package-info.java   |  22 ++
 .../dofn/DetectNewPartitionsDoFn.java  | 112 +++
 .../changestreams/dofn/InitializeDoFn.java |  59 
 .../dofn/ReadChangeStreamPartitionDoFn.java| 118 +++
 .../bigtable/changestreams/dofn/package-info.java  |  22 ++
 .../changestreams/encoder/package-info.java|  24 ++
 .../changestreams/model/PartitionRecord.java   | 135 
 .../bigtable/changestreams/model/package-info.java |  22 ++
 .../gcp/bigtable/changestreams/package-info.java   |  22 ++
 .../ReadChangeStreamPartitionProgressTracker.java  | 160 ++
 .../changestreams/restriction/StreamProgress.java  |  36 +++
 .../changestreams/restriction/package-info.java|  22 ++
 .../changestreams/TimestampConverterTest.java  |  44 +++
 30 files changed, 2194 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
index db3ac3de6e9..f296f5966cc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
@@ -37,16 +37,19 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-abstract class BigtableConfig implements Serializable {
+public abstract class BigtableConfig implements Serializable {
 
   /** Returns the project id being written to. */
-  abstract @Nullable ValueProvider getProjectId();
+  public abstract @Nullable ValueProvider getProjectId();
 
   /** Returns the instance id being written to. */
-  abstract @Nullable ValueProvider getInstanceId();
+  public abstract @Nullable ValueProvider getInstanceId();
 
   /** Returns the table being read from. */
-  abstract @Nullable ValueProvider getTableId();
+  public abstract @Nullable ValueProvider getTableId();
+
+  /** Returns the app profile being read from. */
+  public abstract @Nullable ValueProvider getAppProfileId();
 
   /**
* Returns the Google Cloud Bigtable instance being written to, and other 
parameters.
@@ -84,6 +87,8 @@ abstract class BigtableConfig implements Serializable {
 
 abstract Builder setTableId(ValueProvider tableId);
 
+abstract Builder setAppProfileId(ValueProvider appProfileId);
+
 /** @deprecated will be replaced by bigtable options configurator. */
 @Deprecated
 abstract Builder setBigtableOptions(BigtableOptions options);
@@ -100,46 +105,51 @@ abstract class BigtableConfig implements Serializable {
 abstract BigtableConfig build();
   }
 
-  BigtableConfig withProjectId(ValueProvider projectId) {
+  public BigtableConfig withProjectId(ValueProvider projectId) {
 checkArgument(projectId != null, "Project Id of BigTable can not be null");
 return toBuilder().setProjectId(projectId).build();
   }
 
-  BigtableConfig withInstanceId(ValueProvider instanceId) {
+  public BigtableConfig withInstanceId(ValueProvider instanceId) 

[beam] branch master updated: Fixing a table existence verifier for BQ

2023-01-27 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new bfb6a8ea0a8 Fixing a table existence verifier for BQ
 new 2bcec868da0 Merge pull request #25195 from pabloem/fix-verify-table-bq
bfb6a8ea0a8 is described below

commit bfb6a8ea0a806836fae2da0cd052fc911c13e5d3
Author: Pablo E 
AuthorDate: Thu Jan 26 14:10:23 2023 -0800

Fixing a table existence verifier for BQ
---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 0a62245390b..825707f3cae 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -593,10 +593,15 @@ public class BigQueryHelpers {
 
   static void verifyTablePresence(DatasetService datasetService, 
TableReference table) {
 try {
-  datasetService.getTable(table);
+  Table fetchedTable = datasetService.getTable(table);
+  if (fetchedTable == null) {
+throw new IOException("Table does not exist.");
+  }
 } catch (Exception e) {
   ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-  if ((e instanceof IOException) && 
errorExtractor.itemNotFound((IOException) e)) {
+  if ((e instanceof IOException)
+  && ("Table does not exist.".equals(e.getMessage())
+  || errorExtractor.itemNotFound((IOException) e))) {
 throw new IllegalArgumentException(
 String.format(RESOURCE_NOT_FOUND_ERROR, "table", 
toTableSpec(table)), e);
   } else if (e instanceof RuntimeException) {



[beam] branch master updated (6eef233d1c9 -> ac117b97766)

2023-01-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6eef233d1c9 Fix website.
 add ac117b97766 Fix Jdbc Write after window assigned (#25173)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java| 21 -
 1 file changed, 12 insertions(+), 9 deletions(-)



[beam] branch master updated: Standardizing naming and URN for Pubsub Read Schema Transform (#25170)

2023-01-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 634b0453469 Standardizing naming and URN for Pubsub Read Schema 
Transform (#25170)
634b0453469 is described below

commit 634b0453469b66ee4c135aca48b02d2425916f36
Author: Pablo Estrada 
AuthorDate: Tue Jan 31 00:29:00 2023 -0500

Standardizing naming and URN for Pubsub Read Schema Transform (#25170)
---
 ...=> PubsubReadSchemaTransformConfiguration.java} | 10 ++--
 ...java => PubsubReadSchemaTransformProvider.java} | 32 +--
 .../PubsubSchemaTransformMessageToRowFactory.java  | 18 +++
 ... => PubsubReadSchemaTransformProviderTest.java} | 62 +++---
 ...bsubSchemaTransformMessageToRowFactoryTest.java | 28 +-
 5 files changed, 75 insertions(+), 75 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
similarity index 94%
rename from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java
rename to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
index fc4d7db185d..185d8aea976 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadConfiguration.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
@@ -34,11 +34,11 @@ import 
org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 @Experimental
 @DefaultSchema(AutoValueSchema.class)
 @AutoValue
-public abstract class PubsubSchemaTransformReadConfiguration {
+public abstract class PubsubReadSchemaTransformConfiguration {
 
-  /** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
+  /** Instantiates a {@link PubsubReadSchemaTransformConfiguration.Builder}. */
   public static Builder builder() {
-return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
+return new AutoValue_PubsubReadSchemaTransformConfiguration.Builder();
   }
 
   /** The expected schema of the Pub/Sub message. */
@@ -165,7 +165,7 @@ public abstract class 
PubsubSchemaTransformReadConfiguration {
  */
 public abstract Builder setTopic(String value);
 
-/** Builds a {@link PubsubSchemaTransformReadConfiguration} instance. */
-public abstract PubsubSchemaTransformReadConfiguration build();
+/** Builds a {@link PubsubReadSchemaTransformConfiguration} instance. */
+public abstract PubsubReadSchemaTransformConfiguration build();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
similarity index 88%
rename from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
rename to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
index 096b598de17..084424758ed 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
@@ -40,7 +40,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads 
configured using
- * {@link PubsubSchemaTransformReadConfiguration}.
+ * {@link PubsubReadSchemaTransformConfiguration}.
  *
  * Internal only: This class is actively being worked on, and it 
will likely change. We
  * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
@@ -52,19 +52,19 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @Internal
 @Experimental(Kind.SCHEMAS)
 @AutoService(SchemaTransformProvider.class)
-public class PubsubSchemaTransformReadProvider
-extends 
TypedSchemaTransformProvider {
+public class PubsubReadSchemaTransformProvider
+extends 
TypedSchemaTransformProvider {
   static final String OUTPUT_TAG = "OUTPUT";
 
   /** Returns the expected class of the configuration. */
   @Override
-  protected Class configurationClass() 
{
-return PubsubSchemaTransformReadConfiguration.class;
+  protected Class configurationClass() 
{
+return PubsubReadSchemaTransform

[beam] branch master updated: Basic SchemaTransform implementation for SQLTransform. (#25177)

2023-02-04 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4e6a3e41aba Basic SchemaTransform implementation for SQLTransform. 
(#25177)
4e6a3e41aba is described below

commit 4e6a3e41aba2f5b6cfae5469f2a1f530e0b8f9f0
Author: Byron Ellis 
AuthorDate: Sat Feb 4 10:45:42 2023 -0800

Basic SchemaTransform implementation for SQLTransform. (#25177)

* Make the name of the desired pcollection name public so other people 
don't have to guess.

* WIP SQLTransformSchemaTransformProvider. Should be able to provide access 
to most of the SQLTransform parameters, though probably not the UDF 
implementation or the error transform. That would require substantial 
refactoring/surgery I think.

* WIP. Using enumerations to make the configuration schema more 
discoverable (particularly for tableproviders and query planners which could 
change over time)

* Basic version of the SQLTransformSchemaTransformProvider. Has basically a 
complete configuration and I think a (hacky) approach to getting DLQ 
information out of the transform at least good enough to use for testing. 
There's no unit test coverage in this module so not clear the existing external 
transform is tested/working either.

* Remove clutter

* Add @Experimental annotation and always emit an errors PCollection even 
if there are no calculations.

* Fix up checkstyle violation
---
 .../extensions/sql/expansion-service/build.gradle  |   1 +
 .../SqlTransformSchemaTransformProvider.java   | 232 +
 .../beam/sdk/extensions/sql/SqlTransform.java  |   3 +-
 3 files changed, 235 insertions(+), 1 deletion(-)

diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle 
b/sdks/java/extensions/sql/expansion-service/build.gradle
index 261f83e7853..48f31c75128 100644
--- a/sdks/java/extensions/sql/expansion-service/build.gradle
+++ b/sdks/java/extensions/sql/expansion-service/build.gradle
@@ -38,6 +38,7 @@ dependencies {
   implementation project(path: ":sdks:java:extensions:sql")
   implementation project(path: ":sdks:java:extensions:sql:zetasql")
   implementation library.java.vendored_guava_26_0_jre
+
 }
 
 task runExpansionService (type: JavaExec) {
diff --git 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
new file mode 100644
index 000..0649a0978e4
--- /dev/null
+++ 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.expansion;
+
+import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;

[beam] branch master updated: issue24170 google colab link added (#24820)

2023-02-04 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 16cb63be7e0 issue24170 google colab link added (#24820)
16cb63be7e0 is described below

commit 16cb63be7e09230b30f582844549328feaa70168
Author: Dmitry Repin 
AuthorDate: Sat Feb 4 23:40:33 2023 +0400

issue24170 google colab link added (#24820)

* issue24170 added google colab link

* fixed github icon svg

* issue24170 fixed code

* issue24170 colab new line

* issue24170 colors fixed

* issue24170 github and colab buttons at embedded playground

* issue24170 pr fix

* issue23924 dataset button (#415)

* issue23924 dataset button

* issue23924 fix pr

* issue23924 embedded shorten buttons

* minor fixes

* issue 23924 show buttons text arg fixed
---
 .../frontend/assets/{github.svg => colab.svg}  | 10 ++--
 playground/frontend/assets/github.svg  |  4 +-
 playground/frontend/assets/translations/en.yaml|  3 ++
 .../lib/components/link_button/colab_button.dart   | 44 +++
 .../lib/components/link_button/dataset_button.dart | 44 +++
 .../lib/components/link_button/github_button.dart  | 43 +++
 .../lib/components/link_button/link_button.dart| 63 ++
 playground/frontend/lib/l10n/app_en.arb|  4 --
 .../description_popover/description_popover.dart   | 30 ---
 .../examples/components/example_actions.dart   | 52 ++
 .../widgets/embedded_actions.dart  | 58 +++-
 playground/frontend/lib/src/assets/assets.gen.dart |  2 +
 .../lib/playground_components.dart |  1 +
 .../lib/src/models/example.dart|  6 ++-
 .../lib/src/models/example_base.dart   |  6 ++-
 .../example_client/grpc_example_client.dart|  5 +-
 16 files changed, 326 insertions(+), 49 deletions(-)

diff --git a/playground/frontend/assets/github.svg 
b/playground/frontend/assets/colab.svg
similarity index 53%
copy from playground/frontend/assets/github.svg
copy to playground/frontend/assets/colab.svg
index 36b445d2489..3a3fc2f6097 100644
--- a/playground/frontend/assets/github.svg
+++ b/playground/frontend/assets/colab.svg
@@ -17,8 +17,10 @@
 under the License.
 -->
 
-http://www.w3.org/2000/svg";>
-
+http://www.w3.org/2000/svg";>
+
+
+
+
+
 
diff --git a/playground/frontend/assets/github.svg 
b/playground/frontend/assets/github.svg
index 36b445d2489..5e12f3c2eac 100644
--- a/playground/frontend/assets/github.svg
+++ b/playground/frontend/assets/github.svg
@@ -17,8 +17,8 @@
 under the License.
 -->
 
-http://www.w3.org/2000/svg";>
+http://www.w3.org/2000/svg";>
 
+stroke="#A0A4AB" stroke-width="1.5" stroke-linecap="round" 
stroke-linejoin="round" transform="translate(0.5, -0.7) scale(1.2)" />
 
diff --git a/playground/frontend/assets/translations/en.yaml 
b/playground/frontend/assets/translations/en.yaml
index 023e0324c8e..f1e685d0756 100644
--- a/playground/frontend/assets/translations/en.yaml
+++ b/playground/frontend/assets/translations/en.yaml
@@ -19,4 +19,7 @@ intents:
   playground:
 clearOutput: 'Clear Output'
 newExample: 'New Example'
+openGoogleColab: 'Open in Google Colab'
+showDatasets: 'Dataset: {fileName}'
+viewOnGithub: 'View on GitHub'
 usesEmulatedData: 'This examples uses emulated data'
diff --git a/playground/frontend/lib/components/link_button/colab_button.dart 
b/playground/frontend/lib/components/link_button/colab_button.dart
new file mode 100644
index 000..4f1700cf21f
--- /dev/null
+++ b/playground/frontend/lib/components/link_button/colab_button.dart
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import 'package:easy_localization/easy_localization.dar

[beam] branch master updated: Upgrading spring-expression to latest patch version (#25348)

2023-02-06 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d20d0b01c3c Upgrading spring-expression to latest patch version 
(#25348)
d20d0b01c3c is described below

commit d20d0b01c3c6bcde551420f36e13d794c930f1e2
Author: Pablo Estrada 
AuthorDate: Mon Feb 6 12:36:22 2023 -0800

Upgrading spring-expression to latest patch version (#25348)
---
 playground/backend/containers/java/Dockerfile | 2 +-
 sdks/java/io/kafka/build.gradle   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/playground/backend/containers/java/Dockerfile 
b/playground/backend/containers/java/Dockerfile
index 2a1561c3b0b..759118ddb99 100644
--- a/playground/backend/containers/java/Dockerfile
+++ b/playground/backend/containers/java/Dockerfile
@@ -53,7 +53,7 @@ RUN mvn dependency:copy-dependencies
 FROM apache/beam_java8_sdk:$BEAM_VERSION
 
 ARG BEAM_VERSION
-ARG SPRING_VERSION=5.3.18
+ARG SPRING_VERSION=5.3.25
 ARG KAFKA_CLIENTS_VERSION=2.3.1
 ENV SERVER_IP=0.0.0.0
 ENV SERVER_PORT=8080
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index e9b3b255b9b..8d64cf0bf97 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -70,7 +70,7 @@ dependencies {
   implementation library.java.joda_time
   implementation library.java.jackson_annotations
   implementation library.java.jackson_databind
-  implementation "org.springframework:spring-expression:5.3.18"
+  implementation "org.springframework:spring-expression:5.3.25"
   implementation ("io.confluent:kafka-avro-serializer:5.3.2") {
 // zookeeper depends on "spotbugs-annotations:3.1.9" which clashes with 
current
 // "spotbugs-annotations:3.1.12" used in Beam. Not required.



[beam] branch pabloem-patch-2 created (now 6d971168e8c)

2023-02-06 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch pabloem-patch-2
in repository https://gitbox.apache.org/repos/asf/beam.git


  at 6d971168e8c fix doc on bq sxtorage

This branch includes the following new commits:

 new 6d971168e8c fix doc on bq sxtorage

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[beam] 01/01: fix doc on bq sxtorage

2023-02-06 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch pabloem-patch-2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6d971168e8c908797d4366e3c706914949abe7ce
Author: Pablo Estrada 
AuthorDate: Mon Feb 6 15:41:50 2023 -0500

fix doc on bq sxtorage
---
 .../www/site/content/en/documentation/io/built-in/google-bigquery.md  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 6f12a4f6bcf..8ab3b0a9a04 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -336,8 +336,8 @@ BigQuery. SDK versions before 2.25.0 support the BigQuery 
Storage API as an
 and use the pre-GA BigQuery Storage API surface. Callers should migrate
 pipelines which use the BigQuery Storage API to use SDK version 2.25.0 or 
later.
 
-The Beam SDK for Python does not support the BigQuery Storage API. See
-[Issue 20687](https://github.com/apache/beam/issues/20687)).
+The Beam SDK for Python supports the BigQuery Storage API. Enable it
+by passing a `method` parameter to it.
 
  Updating your code
 



[beam] branch bigtable-cdc-feature-branch created (now d20d0b01c3c)

2023-02-07 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


  at d20d0b01c3c Upgrading spring-expression to latest patch version 
(#25348)

No new revisions were added by this update.



[beam] branch master updated: [BEAM-12164] Enforced only positive state transitions from CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)

2023-02-07 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new bf5114bb2fc [BEAM-12164] Enforced only positive state transitions from 
CREATED -> SCHEDULED -> RUNNING -> FINISHED for the change stream metadata 
table. (#25311)
bf5114bb2fc is described below

commit bf5114bb2fc128d647b9b722dd902dedc160f7ed
Author: nancyxu123 
AuthorDate: Tue Feb 7 14:17:46 2023 -0800

[BEAM-12164] Enforced only positive state transitions from CREATED -> 
SCHEDULED -> RUNNING -> FINISHED for the change stream metadata table. (#25311)

* Made sure state transitions were correct when updating partition state

* Metadata Table changes

* Comitted

* Fixed failing unittests

* Spotless apply

-

Co-authored-by: Nancy Xu 
---
 .../action/DetectNewPartitionsAction.java  |   2 +-
 .../action/QueryChangeStreamAction.java|   2 +-
 .../changestreams/dao/PartitionMetadataDao.java|  85 +++-
 .../dao/PartitionMetadataDaoTest.java  | 108 ++---
 4 files changed, 177 insertions(+), 20 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index 348e23e366e..934210250f5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -179,7 +179,7 @@ public class DetectNewPartitionsAction {
   partition.toBuilder().setScheduledAt(scheduledAt).build();
 
   LOG.info(
-  "[{}] Scheduled partition at {} with start time {} and end time {}",
+  "[{}] Outputting partition at {} with start time {} and end time {}",
   updatedPartition.getPartitionToken(),
   updatedPartition.getScheduledAt(),
   updatedPartition.getStartTimestamp(),
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 5fd39a6b13f..6fb60f21882 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -233,7 +233,7 @@ public class QueryChangeStreamAction {
   LOG.debug("[{}] Finishing partition", token);
   partitionMetadataDao.updateToFinished(token);
   metrics.decActivePartitionReadCounter();
-  LOG.info("[{}] Partition finished", token);
+  LOG.info("[{}] After attempting to finish the partition", token);
 }
 return ProcessContinuation.stop();
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index f6f32c8023f..6dc0e7a580d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -39,7 +39,10 @@ import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TransactionContext;
 import com.google.cloud.spanner.TransactionRunner;
 import com.google.cloud.spanner.Value;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -48,6 +51,8 @@ import javax.annotation.Nullable;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Data access object for the Connector metadata tables. */
 public class PartitionMetadataDao {
@@ -349,6 +354,7 @@ public class PartitionMetadataDao {
 
   /** Represents the execution of a r

[beam] branch master updated: Playground Frontend Test workflow (#24728) (#25254)

2023-02-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e84a3e8e001 Playground Frontend Test workflow (#24728) (#25254)
e84a3e8e001 is described below

commit e84a3e8e0017f3a8648eaf365cd5a2526bea42df
Author: alexeyinkin 
AuthorDate: Wed Feb 8 21:00:51 2023 +0400

Playground Frontend Test workflow (#24728) (#25254)

* Playground Frontend Test workflow (#24728)

* Trigger the workflow (#24728)

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Test

* Fix

* Fix

* Clean up (#24728)

* Test

* Add Chrome requirement in README (#24728)

* Polish README (#24728)

* Fix a typo (#24728)
---
 .github/workflows/playground_frontend_test.yml | 91 ++
 playground/frontend/README.md  | 11 ++--
 playground/frontend/build.gradle   |  5 +-
 playground/frontend/lib/config.example.dart| 15 ++---
 4 files changed, 107 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/playground_frontend_test.yml 
b/.github/workflows/playground_frontend_test.yml
new file mode 100644
index 000..129234887b1
--- /dev/null
+++ b/.github/workflows/playground_frontend_test.yml
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Playground Frontend Test
+
+on:
+  push:
+paths: ['playground/frontend/**']
+branches: ['**']
+  pull_request:
+paths: ['playground/frontend/**']
+branches: ['**']
+  workflow_dispatch:
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || 
github.head_ref || github.ref }}'
+  cancel-in-progress: true
+
+jobs:
+  playground_frontend_test:
+name: Playground Frontend Test
+runs-on: ubuntu-latest
+
+env:
+  FLUTTER_VERSION: '3.3.2'
+
+steps:
+  - uses: actions/checkout@v3
+
+  - name: 'Cache Flutter Dependencies'
+uses: actions/cache@v3
+with:
+  path: /opt/hostedtoolcache/flutter
+  key: ${{ runner.OS }}-flutter-install-cache-${{ env.FLUTTER_VERSION 
}}
+  restore-keys: ${{ runner.OS }}-flutter-install-cache
+
+  - uses: subosito/flutter-action@v2
+with:
+  flutter-version: ${{ env.FLUTTER_VERSION }}
+  channel: 'stable'
+
+  - name: 'Install Dependencies'
+working-directory: playground/frontend
+run: |
+  cd playground_components && flutter pub get && cd ..
+  cd playground_components_dev && flutter pub get && cd ..
+  flutter pub get
+
+#  - name: 'Formatting'
+#run: dart format --output=none --set-exit-if-changed .
+
+#  - name: 'Analyze playground_components'
+#working-directory: playground/frontend/playground_components
+#run: dart analyze --fatal-infos
+
+#  - name: 'Analyze playground_components_dev'
+#working-directory: playground/frontend/playground_components_dev
+#run: dart analyze --fatal-infos
+
+#  - name: 'Analyze playground'
+#working-directory: playground/frontend
+#run: dart analyze --fatal-infos lib test
+
+  - name: 'Test playground_components'
+working-directory: playground/frontend/playground_components
+run: flutter test
+
+  - name: 'Test playground'
+working-directory: playground/frontend
+run: flutter test
+
+  - uses: nanasess/setup-chromedriver@v1
+
+  - name: 'Integration tests'
+run: |
+  cp playground/frontend/lib/config.example.dart 
playground/frontend/lib/config.g.dart
+  chromedriver --port= &
+  ./gradl

[beam] branch master updated: Adding support for @SchemaFieldDescription annotation that allows ann… (#25268)

2023-02-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 513f4b003b1 Adding support for @SchemaFieldDescription annotation that 
allows ann… (#25268)
513f4b003b1 is described below

commit 513f4b003b154893e6ad14e89616a8041b813c50
Author: Pablo Estrada 
AuthorDate: Wed Feb 8 20:57:01 2023 -0800

Adding support for @SchemaFieldDescription annotation that allows ann… 
(#25268)

* Adding support for @SchemaFieldDescription annotation that allows 
annotations on AutoSchema classes

* Fixing type annotation for method

* fixup - use equals to compare descriptions

* fixup
---
 .../sdk/schemas/FieldValueTypeInformation.java | 18 +++
 .../annotations/SchemaFieldDescription.java| 62 ++
 .../sdk/schemas/utils/StaticSchemaInference.java   | 11 ++--
 .../beam/sdk/schemas/AutoValueSchemaTest.java  | 32 +++
 .../beam/sdk/schemas/JavaBeanSchemaTest.java   | 11 
 .../beam/sdk/schemas/JavaFieldSchemaTest.java  |  9 
 .../beam/sdk/schemas/utils/TestJavaBeans.java  | 23 
 .../apache/beam/sdk/schemas/utils/TestPOJOs.java   | 39 ++
 8 files changed, 201 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index 6d207111779..b11fd8cbc52 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
@@ -74,6 +75,9 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
   /** If the field is a map type, returns the key type. */
   public abstract @Nullable FieldValueTypeInformation getMapValueType();
 
+  /** If the field has a description, returns the description for the field. */
+  public abstract @Nullable String getDescription();
+
   abstract Builder toBuilder();
 
   @AutoValue.Builder
@@ -100,6 +104,8 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
 
 public abstract Builder setMapValueType(@Nullable 
FieldValueTypeInformation mapValueType);
 
+public abstract Builder setDescription(@Nullable String fieldDescription);
+
 abstract FieldValueTypeInformation build();
   }
 
@@ -132,6 +138,7 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
 .setMapKeyType(getMapKeyType(field))
 .setMapValueType(getMapValueType(field))
 .setOneOfTypes(Collections.emptyMap())
+.setDescription(getFieldDescription(field))
 .build();
   }
 
@@ -167,6 +174,16 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
 }
   }
 
+  public static  @Nullable String 
getFieldDescription(
+  T member) {
+@Nullable
+SchemaFieldDescription fieldDescription = 
member.getAnnotation(SchemaFieldDescription.class);
+if (fieldDescription == null) {
+  return null;
+}
+return fieldDescription.value();
+  }
+
   public static FieldValueTypeInformation forGetter(Method method, int index) {
 String name;
 if (method.getName().startsWith("get")) {
@@ -190,6 +207,7 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
 .setMapKeyType(getMapKeyType(type))
 .setMapValueType(getMapValueType(type))
 .setOneOfTypes(Collections.emptyMap())
+.setDescription(getFieldDescription(method))
 .build();
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldDescription.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldDescription.java
new file mode 100644
index 000..013aabd8cae
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldDescription.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * wi

[beam] branch master updated: Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)

2023-02-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d36f34df570 Adding SpannerIO.readChangeStreams support for 
SchemaTransform (#24999)
d36f34df570 is described below

commit d36f34df570b845a5817ba38d5f16b5f12b2a825
Author: Pablo Estrada 
AuthorDate: Wed Feb 8 21:00:16 2023 -0800

Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)

* Adding SpannerIO.readChangeStreams support for SchemaTransform

* Improve test and schema shape to include changestream information

* add schema checks

* fixup

* remove unneeded file

* reducing the number of tables that we cache
---
 .../beam/sdk/io/gcp/spanner/ReadSpannerSchema.java |  37 +-
 .../beam/sdk/io/gcp/spanner/SpannerSchema.java |   8 +-
 ...erChangestreamsReadSchemaTransformProvider.java | 394 +
 .../it/SpannerChangeStreamsSchemaTransformIT.java  | 262 ++
 4 files changed, 695 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
index 9b30747f778..9c82701e5a4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -22,6 +22,8 @@ import com.google.cloud.spanner.Dialect;
 import com.google.cloud.spanner.ReadOnlyTransaction;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Statement;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -32,17 +34,44 @@ import org.apache.beam.sdk.values.PCollectionView;
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-class ReadSpannerSchema extends DoFn {
+public class ReadSpannerSchema extends DoFn {
 
   private final SpannerConfig config;
 
   private final PCollectionView dialectView;
 
+  private final Set allowedTableNames;
+
   private transient SpannerAccessor spannerAccessor;
 
+  /**
+   * Constructor for creating an instance of the ReadSpannerSchema class. If 
no {@param
+   * allowedTableNames} is passed, every single table is allowed.
+   *
+   * @param config The SpannerConfig object that contains the configuration 
for accessing the
+   * Spanner database.
+   * @param dialectView A PCollectionView object that holds a Dialect object 
for the database
+   * dialect to use for reading the Spanner schema.
+   */
   public ReadSpannerSchema(SpannerConfig config, PCollectionView 
dialectView) {
+this(config, dialectView, new HashSet());
+  }
+
+  /**
+   * Constructor for creating an instance of the ReadSpannerSchema class.
+   *
+   * @param config The SpannerConfig object that contains the configuration 
for accessing the
+   * Spanner database.
+   * @param dialectView A PCollectionView object that holds a Dialect object 
for the database
+   * dialect to use for reading the Spanner schema.
+   * @param allowedTableNames A set of allowed table names to be used when 
reading the Spanner
+   * schema.
+   */
+  public ReadSpannerSchema(
+  SpannerConfig config, PCollectionView dialectView, Set 
allowedTableNames) {
 this.config = config;
 this.dialectView = dialectView;
+this.allowedTableNames = allowedTableNames == null ? new HashSet<>() : 
allowedTableNames;
   }
 
   @Setup
@@ -68,7 +97,11 @@ class ReadSpannerSchema extends DoFn {
 String columnName = resultSet.getString(1);
 String type = resultSet.getString(2);
 long cellsMutated = resultSet.getLong(3);
-
+if (allowedTableNames.size() > 0 && 
!allowedTableNames.contains(tableName)) {
+  // If we want to filter out table names, and the current table name 
is not part
+  // of the allowed names, we exclude it.
+  continue;
+}
 builder.addColumn(tableName, columnName, type, cellsMutated);
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
index 78941d12a7f..09394d37a44 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -34,7 +34,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 @Supp

[beam] branch pabloem-patch-2 updated (6d971168e8c -> 7d4bf065739)

2023-02-09 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch pabloem-patch-2
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6d971168e8c fix doc on bq sxtorage
 add 7d4bf065739 commentaddress

No new revisions were added by this update.

Summary of changes:
 .../www/site/content/en/documentation/io/built-in/google-bigquery.md| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[beam] branch master updated: fix doc on bq sxtorage (#25353)

2023-02-09 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new cbac3e093a4 fix doc on bq sxtorage (#25353)
cbac3e093a4 is described below

commit cbac3e093a4dde315313c930354f511fc7e9fd2a
Author: Pablo Estrada 
AuthorDate: Thu Feb 9 10:31:06 2023 -0800

fix doc on bq sxtorage (#25353)

* fix doc on bq sxtorage

* commentaddress
---
 .../www/site/content/en/documentation/io/built-in/google-bigquery.md  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 6f12a4f6bcf..6c01b1c4667 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -336,8 +336,8 @@ BigQuery. SDK versions before 2.25.0 support the BigQuery 
Storage API as an
 and use the pre-GA BigQuery Storage API surface. Callers should migrate
 pipelines which use the BigQuery Storage API to use SDK version 2.25.0 or 
later.
 
-The Beam SDK for Python does not support the BigQuery Storage API. See
-[Issue 20687](https://github.com/apache/beam/issues/20687)).
+The Beam SDK for Python supports the BigQuery Storage API. Enable it
+by passing `method=DIRECT_READ` as a parameter to `ReadFromBigQuery`.
 
  Updating your code
 



[beam] branch master updated: [Playground] Fix Test_getRunOrTestCmd on Go 1.20 (#25379)

2023-02-09 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 64e40d2c018 [Playground] Fix Test_getRunOrTestCmd on Go 1.20 (#25379)
64e40d2c018 is described below

commit 64e40d2c018f8e906f4bec32ef67f02734a95721
Author: Timur Sultanov 
AuthorDate: Thu Feb 9 22:38:08 2023 +0400

[Playground] Fix Test_getRunOrTestCmd on Go 1.20 (#25379)

* Fix Test_getRunOrTestCmd on Go 1.20

* Use go_cmp to compare test result with expected value
---
 playground/backend/go.mod   |  2 +-
 .../internal/code_processing/code_processing_test.go| 13 +++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/playground/backend/go.mod b/playground/backend/go.mod
index 0601cf51427..a2580231ff1 100644
--- a/playground/backend/go.mod
+++ b/playground/backend/go.mod
@@ -30,6 +30,7 @@ require (
github.com/rs/cors v1.8.2
github.com/spf13/viper v1.14.0
github.com/stretchr/testify v1.8.1
+   github.com/google/go-cmp v0.5.9
go.uber.org/goleak v1.2.0
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
@@ -49,7 +50,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // 
indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
-   github.com/google/go-cmp v0.5.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
diff --git 
a/playground/backend/internal/code_processing/code_processing_test.go 
b/playground/backend/internal/code_processing/code_processing_test.go
index fb3a2a1313e..f6c7fb19425 100644
--- a/playground/backend/internal/code_processing/code_processing_test.go
+++ b/playground/backend/internal/code_processing/code_processing_test.go
@@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
+   "github.com/google/go-cmp/cmp"
"io/fs"
"os"
"os/exec"
@@ -626,10 +627,18 @@ func Test_getRunOrTestCmd(t *testing.T) {
want: wantTestExec,
},
}
+
+   execComparer := cmp.Comparer(func(a exec.Cmd, b exec.Cmd) bool {
+   return a.Path == b.Path &&
+   cmp.Equal(a.Args, b.Args) &&
+   cmp.Equal(a.Env, b.Env) &&
+   a.Dir == b.Dir
+   })
+
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
-   if got := getExecuteCmd(tt.args.isUnitTest, 
tt.args.executor, tt.args.ctxWithTimeout); !reflect.DeepEqual(got, tt.want) {
-   t.Errorf("getExecuteCmd() = %v, want %v", got, 
tt.want)
+   if got := getExecuteCmd(tt.args.isUnitTest, 
tt.args.executor, tt.args.ctxWithTimeout); !cmp.Equal(got, tt.want, 
execComparer) {
+   t.Errorf("getExecuteCmd() = '%v', want '%v', 
diff = %v", got, tt.want, cmp.Diff(got, tt.want, execComparer))
}
})
}



[beam] branch master updated: Add Two Counter Metric in BigQuery Write Schema Transform (#25155)

2023-02-09 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 78c15648b51 Add Two Counter Metric in BigQuery Write Schema Transform 
(#25155)
78c15648b51 is described below

commit 78c15648b514f5e61b83c593715114516ea639fb
Author: nickuncaged1201 <56149585+nickuncaged1...@users.noreply.github.com>
AuthorDate: Thu Feb 9 14:08:22 2023 -0800

Add Two Counter Metric in BigQuery Write Schema Transform (#25155)

* Added element counter and error counter for BQ write schema transform

* Fixied styling issues with naming

* Combined two trivial counter class for brevity. Used finishbundle 
annotation to reduce the number of calls to counter.inc() for better 
performance.

* fix formatting

-

Co-authored-by: Nick Li 
---
 ...ueryStorageWriteApiSchemaTransformProvider.java |  42 -
 ...StorageWriteApiSchemaTransformProviderTest.java | 169 ++---
 2 files changed, 189 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 5f7851bba51..22e8abca35b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -37,6 +37,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -45,8 +47,10 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
@@ -222,6 +226,30 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
   this.testBigQueryServices = testBigQueryServices;
 }
 
+// A generic counter for PCollection of Row. Will be initialized with the 
given
+// name argument. Performs element-wise counter of the input PCollection.
+private static class ElementCounterFn extends DoFn {
+  private Counter bqGenericElementCounter;
+  private Long elementsInBundle = 0L;
+
+  ElementCounterFn(String name) {
+this.bqGenericElementCounter =
+
Metrics.counter(BigQueryStorageWriteApiPCollectionRowTupleTransform.class, 
name);
+  }
+
+  @ProcessElement
+  public void process(ProcessContext c) {
+this.elementsInBundle += 1;
+c.output(c.element());
+  }
+
+  @FinishBundle
+  public void finish(FinishBundleContext c) {
+this.bqGenericElementCounter.inc(this.elementsInBundle);
+this.elementsInBundle = 0L;
+  }
+}
+
 @Override
 public PCollectionRowTuple expand(PCollectionRowTuple input) {
   // Check that the input exists
@@ -241,7 +269,12 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 : Duration.standardSeconds(triggeringFrequency));
   }
 
-  WriteResult result = inputRows.apply(write);
+  Schema inputSchema = inputRows.getSchema();
+  WriteResult result =
+  inputRows
+  .apply("element-count", ParDo.of(new 
ElementCounterFn("element-counter")))
+  .setRowSchema(inputSchema)
+  .apply(write);
 
   Schema errorSchema =
   Schema.of(
@@ -263,7 +296,12 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
   .build()))
   .setRowSchema(errorSchema);
 
-  return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorRows);
+   

[beam] branch master updated (983d9034b85 -> aeb0410d5e5)

2023-02-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 983d9034b85 Added MetadataSpannerConfig class for generating 
SpannerConfig for accessing change stream metadata database (#25193)
 add aeb0410d5e5 Remove ValueProvider from BigtableIO ReadChangeStream 
(#25409)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   | 63 +-
 1 file changed, 13 insertions(+), 50 deletions(-)



[beam] branch master updated (aeb0410d5e5 -> 81f55cb1d48)

2023-02-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from aeb0410d5e5 Remove ValueProvider from BigtableIO ReadChangeStream 
(#25409)
 add 81f55cb1d48 Annotate Cloud Bigtable implementation details as Internal 
(#25403)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java  | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java  | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java| 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java   | 3 +++
 .../beam/sdk/io/gcp/bigtable/changestreams/TimestampConverter.java | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/UniqueIdGenerator.java  | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java   | 2 ++
 .../sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java   | 2 ++
 .../gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java   | 2 ++
 .../bigtable/changestreams/action/GenerateInitialPartitionsAction.java | 2 ++
 .../bigtable/changestreams/action/ReadChangeStreamPartitionAction.java | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java| 2 ++
 .../apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java  | 2 ++
 .../sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java   | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java   | 2 ++
 .../io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java| 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java| 2 ++
 .../gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java | 2 ++
 .../beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java  | 2 ++
 .../restriction/ReadChangeStreamPartitionProgressTracker.java  | 2 ++
 .../sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java  | 2 ++
 21 files changed, 43 insertions(+)



[beam] branch bigtable-cdc-feature-branch updated: Create metadata table in initialize stage if table doesn't exist. (#25364)

2023-02-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
 new feef2e40881 Create metadata table in initialize stage if table doesn't 
exist. (#25364)
feef2e40881 is described below

commit feef2e40881e611c7eb794e5d43d8086e0f5e482
Author: Tony Tang 
AuthorDate: Fri Feb 10 21:18:40 2023 -0500

Create metadata table in initialize stage if table doesn't exist. (#25364)

* Create metadata table during initialize stage of Bigtable Change Stream 
connector

* Fix GcpApiSurfaceTest that broke because of the addition of Cloud 
Bigtable's Admin API

* Fix import

* Fix dependencies error

* Change Cloud Bigtable dependencies to depend on custom public artifact 
registry instead of local jars

* Remove unncessary excludes

* Replace placeholder class with real implementation
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 sdks/java/io/google-cloud-platform/build.gradle|  25 ++-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   |   2 +-
 .../changestreams/ChangeStreamMutation.java|  24 ---
 .../changestreams/action/ChangeStreamAction.java   |   6 +-
 .../action/ReadChangeStreamPartitionAction.java|   2 +-
 .../dao/BigtableChangeStreamAccessor.java  | 220 +
 .../changestreams/dao/ChangeStreamDao.java |   5 +-
 .../gcp/bigtable/changestreams/dao/DaoFactory.java |  18 +-
 .../changestreams/dao/MetadataTableAdminDao.java   |  94 -
 .../changestreams/dao/MetadataTableDao.java|  21 +-
 .../changestreams/dofn/InitializeDoFn.java |  20 +-
 .../dofn/ReadChangeStreamPartitionDoFn.java|   2 +-
 .../changestreams/encoder/package-info.java|  24 ---
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |  12 +-
 .../changestreams/dofn/InitializeDoFnTest.java | 123 
 16 files changed, 533 insertions(+), 67 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 414b3ccdc55..db956e46a21 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -600,7 +600,7 @@ class BeamModulePlugin implements Plugin {
 google_cloud_bigquery_storage   : 
"com.google.cloud:google-cloud-bigquerystorage", // 
google_cloud_platform_libraries_bom sets version
 google_cloud_bigtable   : 
"com.google.cloud:google-cloud-bigtable", // 
google_cloud_platform_libraries_bom sets version
 google_cloud_bigtable_client_core   : 
"com.google.cloud.bigtable:bigtable-client-core:1.26.3",
-google_cloud_bigtable_emulator  : 
"com.google.cloud:google-cloud-bigtable-emulator:0.137.1",
+google_cloud_bigtable_emulator  : 
"com.google.cloud:google-cloud-bigtable-emulator:0.147.3",
 google_cloud_core   : 
"com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom 
sets version
 google_cloud_core_grpc  : 
"com.google.cloud:google-cloud-core-grpc", // 
google_cloud_platform_libraries_bom sets version
 google_cloud_datacatalog_v1beta1: 
"com.google.cloud:google-cloud-datacatalog", // 
google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index d4a143c6173..cc7c850c490 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -18,7 +18,13 @@
 
 import groovy.json.JsonOutput
 
-plugins { id 'org.apache.beam.module' }
+plugins {
+  id 'org.apache.beam.module'
+  // For resolving Cloud Bigtable dependencies from custom registry that 
includes Change Stream API.
+  id "maven-publish"
+  id "com.google.cloud.artifactregistry.gradle-plugin" version "2.1.5"
+  // End: Temporary Cloud Bigtable dependencies.
+}
 applyJavaNature(
   automaticModuleName: 'org.apache.beam.sdk.io.gcp',
   enableSpotbugs: false,
@@ -27,6 +33,22 @@ applyJavaNature(
   ],
 )
 
+// For resolving Cloud Bigtable dependencies from custom registry that 
includes Change Stream API.
+repositories {
+  maven {
+url 
"artifactregistry://us-central1-maven.pkg.dev/cloud-bigtable-ecosystem/bigtable-change-streams-preview"
+  }
+}
+configurations.all {
+  resolutionStrategy {
+force group: 'com.google.cloud', name: 'google-cloud-bigtab

[beam] branch master updated: Adding support for DLQ for ZetaSQL (#25426)

2023-02-13 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 299be58cf99 Adding support for DLQ for ZetaSQL (#25426)
299be58cf99 is described below

commit 299be58cf997d9e1561409034692ea4f2d4ff357
Author: Pablo Estrada 
AuthorDate: Mon Feb 13 14:28:01 2023 -0800

Adding support for DLQ for ZetaSQL (#25426)

* Adding support for DLQ for ZetaSQL

* fixed issue for not-all-fields are selected

* fixup

* fix spotless

* fix test
---
 .../apache/beam/sdk/coders/RowCoderGenerator.java  |  12 ++-
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 112 -
 .../sql/zetasql/BeamZetaSqlCalcRelTest.java|  70 +
 .../sql/zetasql/ZetaSqlDialectSpecTest.java|  14 ++-
 4 files changed, 177 insertions(+), 31 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 70ee6c1ad81..5ce7358d882 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -142,7 +142,10 @@ public abstract class RowCoderGenerator {
   }
   // There should never be duplicate encoding positions.
   Preconditions.checkState(
-  schema.getFieldCount() == 
Arrays.stream(encodingPosToRowIndex).distinct().count());
+  schema.getFieldCount() == 
Arrays.stream(encodingPosToRowIndex).distinct().count(),
+  "The input schema (%s) and map for position encoding (%s) do not 
match.",
+  schema.getFields(),
+  encodingPosToRowIndex);
 
   // Component coders are ordered by encoding position, but may encode a 
field with a different
   // row index.
@@ -311,7 +314,12 @@ public abstract class RowCoderGenerator {
 boolean hasNullableFields)
 throws IOException {
   checkState(value.getFieldCount() == value.getSchema().getFieldCount());
-  checkState(encodingPosToIndex.length == value.getFieldCount());
+  checkState(
+  encodingPosToIndex.length == value.getFieldCount(),
+  "Unable to encode row. Expected %s values, but row has %s%s",
+  encodingPosToIndex.length,
+  value.getFieldCount(),
+  value.getSchema().getFieldNames());
 
   // Encode the field count. This allows us to handle compatible schema 
changes.
   VAR_INT_CODER.encode(value.getFieldCount(), outputStream);
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 744fbd0bcd4..be1c4613ad0 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -37,18 +37,24 @@ import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
 import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCluster;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet;
 import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
@@ -64,7 +70,6 @@ import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdO
 import 
org.apache.beam.ve

[beam] branch bigtable-cdc-feature-branch updated (feef2e40881 -> c31bda44f83)

2023-02-13 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


from feef2e40881 Create metadata table in initialize stage if table doesn't 
exist. (#25364)
 add c31bda44f83 Add metadata table admin tests (#25414)

No new revisions were added by this update.

Summary of changes:
 .../changestreams/dao/MetadataTableAdminDao.java   |   1 -
 .../dao/MetadataTableAdminDaoTest.java | 126 +
 2 files changed, 126 insertions(+), 1 deletion(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDaoTest.java



[beam] branch bigtable-cdc-feature-branch updated: Implement generating initial list of partitions (#25411)

2023-02-13 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
 new e743ebaf818 Implement generating initial list of partitions (#25411)
e743ebaf818 is described below

commit e743ebaf818039a01d810b96858c4270be05d638
Author: Tony Tang 
AuthorDate: Mon Feb 13 21:37:43 2023 -0500

Implement generating initial list of partitions (#25411)
---
 .../changestreams/ChangeStreamMetrics.java |  25 
 .../action/DetectNewPartitionsAction.java  |  10 +-
 .../action/GenerateInitialPartitionsAction.java|  42 ++-
 .../changestreams/dao/ChangeStreamDao.java |  11 ++
 .../action/DetectNewPartitionsActionTest.java  | 127 +
 .../GenerateInitialPartitionsActionTest.java   | 119 +++
 6 files changed, 326 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index 2aaa6631ace..f8177adf873 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -18,8 +18,33 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
 
 import java.io.Serializable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 
 /** Class to aggregate metrics related functionality. */
 public class ChangeStreamMetrics implements Serializable {
   private static final long serialVersionUID = 7298901109362981596L;
+  // 
+  // Partition record metrics
+
+  /**
+   * Counter for the total number of partitions identified during the 
execution of the Connector.
+   */
+  public static final Counter LIST_PARTITIONS_COUNT =
+  Metrics.counter(
+  
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+  "list_partitions_count");
+
+  /**
+   * Increments the {@link
+   * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT}
 by
+   * 1 if the metric is enabled.
+   */
+  public void incListPartitionsCount() {
+inc(LIST_PARTITIONS_COUNT);
+  }
+
+  private void inc(Counter counter) {
+counter.inc();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 990d9b2fd01..932796d4ba8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -25,7 +25,8 @@ import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -93,11 +94,14 @@ public class DetectNewPartitionsAction {
   @VisibleForTesting
   public ProcessContinuation run(
   RestrictionTracker tracker,
-  DoFn.OutputReceiver receiver,
+  OutputReceiver receiver,
   ManualWatermarkEstimator watermarkEstimator,
-  DoFn.BundleFinalizer bundleFinalizer,
+  BundleFinalizer bundleFinalizer,
   Timestamp startTime)
   throws Exception {
+if (tracker.currentRestriction().getFrom() == 0L) {
+  return generateInitialPartitionsAction.run(receiver, tracker, 
watermarkEstimator, startTime);
+}
 
 // Terminate if endTime <= watermark that means all partitions have read 
up to or beyond
 // watermark. We no longer need to manage splits and merges, we can 
terminate.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/ja

[beam] branch master updated: Fixing issue with ErrorCapture transform where pipeline issues are caused by lack of proper expansion (#25465)

2023-02-14 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 67b7962f8e7 Fixing issue with ErrorCapture transform where pipeline 
issues are caused by lack of proper expansion (#25465)
67b7962f8e7 is described below

commit 67b7962f8e7c93e7a3e7fefc5911de45c2693644
Author: Pablo Estrada 
AuthorDate: Tue Feb 14 10:35:09 2023 -0800

Fixing issue with ErrorCapture transform where pipeline issues are caused 
by lack of proper expansion (#25465)
---
 .../sql/expansion/SqlTransformSchemaTransformProvider.java   | 9 +
 1 file changed, 9 insertions(+)

diff --git 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
index 0649a0978e4..7502d0881bb 100644
--- 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
+++ 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -37,11 +37,13 @@ import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
@@ -126,6 +128,13 @@ public class SqlTransformSchemaTransformProvider 
implements SchemaTransformProvi
 
 @Override
 public PDone expand(PCollection input) {
+  input.apply(
+  "noop_" + inputs.size(),
+  MapElements.into(TypeDescriptors.nulls())
+  .via(
+  err -> {
+return null;
+  }));
   inputs.add(input);
   return PDone.in(input.getPipeline());
 }



[beam] branch bigtable-cdc-feature-branch updated: Stream changes and handle Heartbeat responses (#25458)

2023-02-14 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
 new 37a18966f5f Stream changes and handle Heartbeat responses (#25458)
37a18966f5f is described below

commit 37a18966f5f5e052d9168f5f7d7e8075d86131ec
Author: Tony Tang 
AuthorDate: Tue Feb 14 17:16:44 2023 -0500

Stream changes and handle Heartbeat responses (#25458)
---
 .../changestreams/ChangeStreamMetrics.java | 20 +
 .../changestreams/action/ChangeStreamAction.java   | 37 
 .../action/ReadChangeStreamPartitionAction.java| 51 ++-
 .../changestreams/dao/ChangeStreamDao.java | 60 +
 .../changestreams/dao/MetadataTableDao.java| 54 
 .../encoder/MetadataTableEncoder.java  | 62 ++
 .../package-info.java} | 22 ++---
 .../changestreams/model/PartitionRecord.java   | 53 ++--
 .../changestreams/restriction/StreamProgress.java  | 50 +++
 .../action/ChangeStreamActionTest.java | 89 
 .../changestreams/dao/MetadataTableDaoTest.java| 96 +
 .../changestreams/dofn/MetadataTableDaoTest.java   | 98 ++
 ...adChangeStreamPartitionProgressTrackerTest.java | 80 ++
 13 files changed, 747 insertions(+), 25 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index f8177adf873..2c534020d39 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -35,6 +35,17 @@ public class ChangeStreamMetrics implements Serializable {
   
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
   "list_partitions_count");
 
+  // ---
+  // Read change stream metrics
+
+  /**
+   * Counter for the total number of heartbeats identified during the 
execution of the Connector.
+   */
+  public static final Counter HEARTBEAT_COUNT =
+  Metrics.counter(
+  
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+  "heartbeat_count");
+
   /**
* Increments the {@link
* 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT}
 by
@@ -44,6 +55,15 @@ public class ChangeStreamMetrics implements Serializable {
 inc(LIST_PARTITIONS_COUNT);
   }
 
+  /**
+   * Increments the {@link
+   * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#HEARTBEAT_COUNT}
 by 1 if
+   * the metric is enabled.
+   */
+  public void incHeartbeatCount() {
+inc(HEARTBEAT_COUNT);
+  }
+
   private void inc(Counter counter) {
 counter.inc();
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 012173a89cd..8a2cbd6ed84 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
@@ -17,11 +17,15 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
+import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange;
+
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
+import com.google.cloud.bigtable.data.v2.models.Heartbeat;
 import com.google.protobuf.ByteString;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -100,6 +104,39 @@ public class ChangeStreamAction {
   DoFn.OutputReceiver> receiver,
   ManualWatermarkEstimator watermarkEstimator,
   boolean shouldDebug) {
+if (record instanceof Heartbeat) {
+  Heartbeat heartbeat = (Heartbeat) record;
+  Stream

[beam] branch bigtable-cdc-feature-branch updated: Not including support for end time in Bigtable Change Stream connector (#25474)

2023-02-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
 new 331b40117ea Not including support for end time in Bigtable Change 
Stream connector (#25474)
331b40117ea is described below

commit 331b40117eac3981ea319265bad18739e1dce297
Author: Tony Tang 
AuthorDate: Wed Feb 15 11:48:49 2023 -0500

Not including support for end time in Bigtable Change Stream connector 
(#25474)
---
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   | 20 ++---
 .../changestreams/action/ActionFactory.java| 10 +++--
 .../action/DetectNewPartitionsAction.java  | 16 -
 .../action/GenerateInitialPartitionsAction.java|  9 ++--
 .../action/ReadChangeStreamPartitionAction.java| 26 ++
 .../changestreams/dao/ChangeStreamDao.java |  8 +--
 .../dofn/DetectNewPartitionsDoFn.java  | 12 +++---
 .../changestreams/model/PartitionRecord.java   | 25 ++---
 .../action/DetectNewPartitionsActionTest.java  |  5 +
 .../GenerateInitialPartitionsActionTest.java   |  4 +---
 10 files changed, 22 insertions(+), 113 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d2eff9cbbfa..00048b73d08 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -222,8 +222,7 @@ import org.slf4j.LoggerFactory;
  *.withInstanceId(instanceId)
  *.withTableId(tableId)
  *.withAppProfileId(appProfileId)
- *.withStartTime(startTime)
- *.withEndTime(endTime));
+ *.withStartTime(startTime));
  * }
  *
  * Permissions
@@ -281,7 +280,6 @@ public class BigtableIO {
*
* 
*   {@link BigtableIO.ReadChangeStream#withStartTime} which defaults to 
now.
-   *   {@link BigtableIO.ReadChangeStream#withEndTime} which defaults to 
empty.
*   {@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with 
defaults to 1 seconds.
*   {@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} 
which defaults to value
*   from {@link BigtableIO.ReadChangeStream#withProjectId}
@@ -1544,8 +1542,6 @@ public class BigtableIO {
 
 abstract @Nullable Timestamp getStartTime();
 
-abstract @Nullable Timestamp getEndTime();
-
 abstract @Nullable Duration getHeartbeatDuration();
 
 abstract @Nullable String getChangeStreamName();
@@ -1657,16 +1653,6 @@ public class BigtableIO {
   return toBuilder().setStartTime(startTime).build();
 }
 
-/**
- * Returns a new {@link BigtableIO.ReadChangeStream} that will stop 
streaming at the specified
- * end time.
- *
- * Does not modify this object.
- */
-public ReadChangeStream withEndTime(Timestamp endTime) {
-  return toBuilder().setEndTime(endTime).build();
-}
-
 /**
  * Returns a new {@link BigtableIO.ReadChangeStream} that will send 
heartbeat messages at
  * specified interval.
@@ -1800,7 +1786,7 @@ public class BigtableIO {
   InitializeDoFn initializeDoFn =
   new InitializeDoFn(daoFactory, 
metadataTableConfig.getAppProfileId().get(), startTime);
   DetectNewPartitionsDoFn detectNewPartitionsDoFn =
-  new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, 
metrics);
+  new DetectNewPartitionsDoFn(actionFactory, daoFactory, metrics);
   ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
   new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, 
actionFactory, metrics);
 
@@ -1821,8 +1807,6 @@ public class BigtableIO {
 
   abstract ReadChangeStream.Builder setStartTime(Timestamp startTime);
 
-  abstract ReadChangeStream.Builder setEndTime(Timestamp endTime);
-
   abstract ReadChangeStream.Builder setHeartbeatDuration(Duration 
interval);
 
   abstract ReadChangeStream.Builder setChangeStreamName(String 
changeStreamName);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
index 18fbc5fe404..cfc5b5fd3b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action

[beam] branch master updated: SpannerIO: parameterizing partitionQuery timeout (#25236)

2023-02-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7fa3cd387a0 SpannerIO: parameterizing partitionQuery timeout (#25236)
7fa3cd387a0 is described below

commit 7fa3cd387a09b10ab4159239825830f8683d44cc
Author: darshan-sj 
AuthorDate: Wed Feb 15 22:19:25 2023 +0530

SpannerIO: parameterizing partitionQuery timeout (#25236)

* SpannerIO: parameterizing partitionQuery timeout

* SpannerIO: parameterizing partitionRead timeout

* Removing default values of timeouts

* formatting changes
---
 .../beam/sdk/io/gcp/spanner/SpannerAccessor.java   | 22 ++
 .../beam/sdk/io/gcp/spanner/SpannerConfig.java | 28 
 .../beam/sdk/io/gcp/spanner/SpannerReadIT.java | 80 ++
 3 files changed, 130 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index 2a277722cc1..619b198bbdd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -30,6 +30,7 @@ import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.DatabaseId;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
 import com.google.spanner.v1.CommitRequest;
 import com.google.spanner.v1.CommitResponse;
 import com.google.spanner.v1.ExecuteSqlRequest;
@@ -162,6 +163,27 @@ public class SpannerAccessor implements AutoCloseable {
   .build());
 }
 
+SpannerStubSettings.Builder spannerStubSettingsBuilder =
+builder.getSpannerStubSettingsBuilder();
+ValueProvider partitionQueryTimeout = 
spannerConfig.getPartitionQueryTimeout();
+if (partitionQueryTimeout != null
+&& partitionQueryTimeout.get() != null
+&& partitionQueryTimeout.get().getMillis() > 0) {
+  spannerStubSettingsBuilder
+  .partitionQuerySettings()
+  .setSimpleTimeoutNoRetries(
+  
org.threeten.bp.Duration.ofMillis(partitionQueryTimeout.get().getMillis()));
+}
+ValueProvider partitionReadTimeout = 
spannerConfig.getPartitionReadTimeout();
+if (partitionReadTimeout != null
+&& partitionReadTimeout.get() != null
+&& partitionReadTimeout.get().getMillis() > 0) {
+  spannerStubSettingsBuilder
+  .partitionReadSettings()
+  .setSimpleTimeoutNoRetries(
+  
org.threeten.bp.Duration.ofMillis(partitionReadTimeout.get().getMillis()));
+}
+
 ValueProvider projectId = spannerConfig.getProjectId();
 if (projectId != null) {
   builder.setProjectId(projectId.get());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
index 8bf6cbb6143..c31f5aaf834 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -75,6 +75,10 @@ public abstract class SpannerConfig implements Serializable {
 
   public abstract @Nullable ValueProvider getDatabaseRole();
 
+  public abstract @Nullable ValueProvider getPartitionQueryTimeout();
+
+  public abstract @Nullable ValueProvider getPartitionReadTimeout();
+
   @VisibleForTesting
   abstract @Nullable ServiceFactory 
getServiceFactory();
 
@@ -149,6 +153,10 @@ public abstract class SpannerConfig implements 
Serializable {
 
 abstract Builder setDatabaseRole(ValueProvider databaseRole);
 
+abstract Builder setPartitionQueryTimeout(ValueProvider 
partitionQueryTimeout);
+
+abstract Builder setPartitionReadTimeout(ValueProvider 
partitionReadTimeout);
+
 public abstract SpannerConfig build();
   }
 
@@ -265,4 +273,24 @@ public abstract class SpannerConfig implements 
Serializable {
   public SpannerConfig withDatabaseRole(ValueProvider databaseRole) {
 return toBuilder().setDatabaseRole(databaseRole).build();
   }
+
+  /** Specifies the PartitionQuery timeout. */
+  public SpannerConfig withPartitionQueryTimeout(Duration 
partitionQueryTimeout) {
+return 
withPartitionQueryTimeout(ValueProvider.StaticValueProvider.of(partitionQueryTimeout));
+  }
+
+  /** Specifies the PartitionQuery timeout. */
+  public SpannerConfig withPartitionQueryTimeout(ValueProvider 
partitionQueryTimeout) {
+retur

[beam] branch master updated (bd3511571ff -> fd2b16139fd)

2023-02-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from bd3511571ff Use example id as CloudPath (#25487)
 add fd2b16139fd SpannerIO: Handling pg spanner.commit_timestamp (#25479)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java| 3 +++
 .../org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java | 8 ++--
 2 files changed, 9 insertions(+), 2 deletions(-)



[beam] branch master updated (fd2b16139fd -> 26735eb9007)

2023-02-15 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from fd2b16139fd SpannerIO: Handling pg spanner.commit_timestamp (#25479)
 add 26735eb9007 Fix provider to be found by AutoService (#25491)

No new revisions were added by this update.

Summary of changes:
 .../sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java   | 3 +++
 1 file changed, 3 insertions(+)



[beam] branch bigtable-cdc-feature-branch updated: Handle ChangeStreamMutation (#25459)

2023-02-16 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
 new d3c46939f21 Handle ChangeStreamMutation (#25459)
d3c46939f21 is described below

commit d3c46939f219956ea5fc458b6939a853e296228d
Author: Tony Tang 
AuthorDate: Thu Feb 16 13:47:37 2023 -0500

Handle ChangeStreamMutation (#25459)
---
 .../changestreams/ChangeStreamMetrics.java | 55 
 .../changestreams/action/ChangeStreamAction.java   | 42 +++
 .../action/ChangeStreamActionTest.java | 60 ++
 3 files changed, 157 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index 2c534020d39..ed14eb50d1c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 
 /** Class to aggregate metrics related functionality. */
@@ -46,6 +47,30 @@ public class ChangeStreamMetrics implements Serializable {
   
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
   "heartbeat_count");
 
+  /**
+   * Counter for the total number of ChangeStreamMutations that are initiated 
by users (not garbage
+   * collection) identified during the execution of the Connector.
+   */
+  public static final Counter CHANGE_STREAM_MUTATION_USER_COUNT =
+  Metrics.counter(
+  
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+  "change_stream_mutation_user_count");
+
+  /**
+   * Counter for the total number of ChangeStreamMutations that are initiated 
by garbage collection
+   * (not user initiated) identified during the execution of the Connector.
+   */
+  public static final Counter CHANGE_STREAM_MUTATION_GC_COUNT =
+  Metrics.counter(
+  
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+  "change_stream_mutation_gc_count");
+
+  /** Distribution for measuring processing delay from commit timestamp. */
+  public static final Distribution PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP =
+  Metrics.distribution(
+  
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+  "processing_delay_from_commit_timestamp");
+
   /**
* Increments the {@link
* 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT}
 by
@@ -64,7 +89,37 @@ public class ChangeStreamMetrics implements Serializable {
 inc(HEARTBEAT_COUNT);
   }
 
+  /**
+   * Increments the {@link
+   * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#CHANGE_STREAM_MUTATION_USER_COUNT}
+   * by 1 if the metric is enabled.
+   */
+  public void incChangeStreamMutationUserCounter() {
+inc(CHANGE_STREAM_MUTATION_USER_COUNT);
+  }
+
+  /**
+   * Increments the {@link
+   * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#CHANGE_STREAM_MUTATION_GC_COUNT}
+   * by 1 if the metric is enabled.
+   */
+  public void incChangeStreamMutationGcCounter() {
+inc(CHANGE_STREAM_MUTATION_GC_COUNT);
+  }
+
+  /**
+   * Adds measurement of an instance for the {@link
+   * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP}.
+   */
+  public void updateProcessingDelayFromCommitTimestamp(long durationInMilli) {
+update(PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP, durationInMilli);
+  }
+
   private void inc(Counter counter) {
 counter.inc();
   }
+
+  private void update(Distribution distribution, long value) {
+distribution.update(value);
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 8a2cbd6ed84..e64cd6fb876 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action

[beam] branch master updated (5fc19fffcdf -> a82084ce87b)

2023-02-16 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 5fc19fffcdf Move closing milestone out of PMC-only tasks (#25516)
 add a82084ce87b update GCP cloud libraries BOM to 26.8.0 (#25470)

No new revisions were added by this update.

Summary of changes:
 .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +-
 sdks/java/container/license_scripts/dep_urls_java.yaml |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)



[beam] branch master updated (c210df3d3b3 -> 545a2e63ca7)

2023-02-16 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from c210df3d3b3 Fix interface{} in iter& emit type of DoFn in Go (#25203)
 add 545a2e63ca7 Task #25064: Python SDK Data sampling implementation 
(#25093)

No new revisions were added by this update.

Summary of changes:
 .../apache_beam/runners/worker/bundle_processor.py |  95 +-
 .../runners/worker/bundle_processor_test.py| 132 ++
 .../apache_beam/runners/worker/data_sampler.py | 156 +
 .../runners/worker/data_sampler_test.py| 192 +
 .../apache_beam/runners/worker/sdk_worker.py   |  34 +++-
 .../apache_beam/runners/worker/sdk_worker_main.py  |   9 +-
 .../apache_beam/runners/worker/sdk_worker_test.py  |  55 +-
 sdks/python/apache_beam/transforms/environments.py |   1 +
 8 files changed, 664 insertions(+), 10 deletions(-)
 create mode 100644 sdks/python/apache_beam/runners/worker/data_sampler.py
 create mode 100644 sdks/python/apache_beam/runners/worker/data_sampler_test.py



[beam] branch bigtable-cdc-feature-branch updated: Cloud Bigtable stream changes and handle CloseStream responses (#25460)

2023-02-21 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
 new eaa17a71d93 Cloud Bigtable stream changes and handle CloseStream 
responses (#25460)
eaa17a71d93 is described below

commit eaa17a71d93169d3905200f6c876a69b435e4f2f
Author: Tony Tang 
AuthorDate: Tue Feb 21 14:22:05 2023 -0500

Cloud Bigtable stream changes and handle CloseStream responses (#25460)

* Handle ChangeStreamMutation

* Evaluate CloseStream split and merge messages from Change Stream API

* Fix rebase issues

-

Co-authored-by: Pablo 
---
 .../changestreams/ByteStringRangeHelper.java   | 118 +++-
 .../changestreams/ChangeStreamMetrics.java |  45 +
 .../changestreams/action/ChangeStreamAction.java   |  34 
 .../action/ReadChangeStreamPartitionAction.java|  51 ++
 .../changestreams/dao/MetadataTableDao.java|  88 +
 .../dofn/ReadChangeStreamPartitionDoFn.java|   1 +
 .../ReadChangeStreamPartitionProgressTracker.java  |   2 +-
 .../changestreams/restriction/StreamProgress.java  |  17 +-
 .../changestreams/ByteStringRangeHelperTest.java   | 186 +++
 .../action/ChangeStreamActionTest.java |  26 +++
 .../ReadChangeStreamPartitionActionTest.java   | 203 +
 .../changestreams/dao/MetadataTableDaoTest.java|  44 +
 12 files changed, 811 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java
index 8f307f526f0..34d3affa01c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java
@@ -18,11 +18,15 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
 
 import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 
 /** Helper functions to evaluate the completeness of collection of 
ByteStringRanges. */
 public class ByteStringRangeHelper {
-
   /**
* Returns formatted string of a partition for debugging.
*
@@ -36,4 +40,116 @@ public class ByteStringRangeHelper {
 + TextFormat.escapeBytes(partition.getEnd())
 + "')";
   }
+
+  /**
+   * Convert partitions to a string for debugging.
+   *
+   * @param partitions to print
+   * @return string representation of partitions
+   */
+  public static String partitionsToString(List partitions) {
+return partitions.stream()
+.map(ByteStringRangeHelper::formatByteStringRange)
+.collect(Collectors.joining(", ", "{", "}"));
+  }
+
+  @VisibleForTesting
+  static class PartitionComparator implements Comparator {
+@Override
+// if first > second, it returns positive number
+// if first < second, it returns negative number
+// if first == second, it returns 0
+// First is greater than second if either of the following are true:
+// - Its start key comes after second's start key
+// - The start keys are equal and its end key comes after second's end key
+// An end key of "" represents the final end key, so it needs to be 
handled as a special case
+public int compare(ByteStringRange first, ByteStringRange second) {
+  int compareStart =
+  ByteString.unsignedLexicographicalComparator()
+  .compare(first.getStart(), second.getStart());
+  if (compareStart != 0) {
+return compareStart;
+  }
+  if (first.getEnd().isEmpty() && !second.getEnd().isEmpty()) {
+return 1;
+  }
+  if (second.getEnd().isEmpty() && !first.getEnd().isEmpty()) {
+return -1;
+  }
+  return ByteString.unsignedLexicographicalComparator()
+  .compare(first.getEnd(), second.getEnd());
+}
+  }
+
+  private static boolean childStartsBeforeParent(
+  ByteString parentStartKey, ByteString childStartKey) {
+// Check if the start key of the child partition comes before the start 
key of the entire
+// parentPartitions
+return 
ByteString.unsignedLexicographicalComparator().compare(parentStartKey, 
childStartKey)
+> 0;
+  }
+
+  private static boolea

[beam] branch master updated: Making the default BigQuery write disposition to be WRITE_APPEND (#25434)

2023-02-21 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 3a6259900e6 Making the default BigQuery write disposition to be 
WRITE_APPEND (#25434)
3a6259900e6 is described below

commit 3a6259900e6497eecec4b49d91aa5cd90c802a01
Author: nickuncaged1201 <56149585+nickuncaged1...@users.noreply.github.com>
AuthorDate: Tue Feb 21 14:35:06 2023 -0800

Making the default BigQuery write disposition to be WRITE_APPEND (#25434)

* Added element counter and error counter for BQ write schema transform

* Fixied styling issues with naming

* Combined two trivial counter class for brevity. Used finishbundle 
annotation to reduce the number of calls to counter.inc() for better 
performance.

* fix formatting

* change the default write disposition to write append instead of write 
empty

-

Co-authored-by: Nick Li 
---
 .../providers/BigQueryStorageWriteApiSchemaTransformProvider.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 22e8abca35b..3391c7ae51b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -316,7 +316,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
   .to(configuration.getTable())
   .withMethod(writeMethod)
   .useBeamSchema()
-  .withFormatFunction(BigQueryUtils.toTableRow());
+  .withFormatFunction(BigQueryUtils.toTableRow())
+  .withWriteDisposition(WriteDisposition.WRITE_APPEND);
 
   if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
 CreateDisposition createDisposition =



[beam] branch master updated: changed metric name to be descriptive of the pipeline type (#25682)

2023-03-02 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 253fbc794b2 changed metric name to be descriptive of the pipeline type 
(#25682)
253fbc794b2 is described below

commit 253fbc794b2a819c7a79abc1aa08ec751e02c3a0
Author: Nick Li <56149585+nickuncaged1...@users.noreply.github.com>
AuthorDate: Thu Mar 2 17:18:52 2023 -0800

changed metric name to be descriptive of the pipeline type (#25682)

* changed metric name to be descriptive of the pipeline type

* minor fixup

* spotless

-

Co-authored-by: Nick Li 
---
 .../providers/BigQueryStorageWriteApiSchemaTransformProvider.java| 5 +++--
 .../BigQueryStorageWriteApiSchemaTransformProviderTest.java  | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 3391c7ae51b..ec04f9a4562 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -272,7 +272,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
   Schema inputSchema = inputRows.getSchema();
   WriteResult result =
   inputRows
-  .apply("element-count", ParDo.of(new 
ElementCounterFn("element-counter")))
+  .apply(
+  "element-count", ParDo.of(new 
ElementCounterFn("BigQuery-write-element-counter")))
   .setRowSchema(inputSchema)
   .apply(write);
 
@@ -298,7 +299,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 
   PCollection errorOutput =
   errorRows
-  .apply("error-count", ParDo.of(new 
ElementCounterFn("error-counter")))
+  .apply("error-count", ParDo.of(new 
ElementCounterFn("BigQuery-write-error-counter")))
   .setRowSchema(errorSchema);
 
   return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorOutput);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index c0a6df5f125..2c62d1b22cc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -167,7 +167,7 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
 .addNameFilter(
 MetricNameFilter.named(
 
BigQueryStorageWriteApiPCollectionRowTupleTransform.class,
-"element-counter"))
+"BigQuery-write-element-counter"))
 .build());
 
 Iterable> counters = metricResults.getCounters();
@@ -255,7 +255,8 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
 MetricsFilter.builder()
 .addNameFilter(
 MetricNameFilter.named(
-
BigQueryStorageWriteApiPCollectionRowTupleTransform.class, "error-counter"))
+
BigQueryStorageWriteApiPCollectionRowTupleTransform.class,
+"BigQuery-write-error-counter"))
 .build());
 
 Iterable> counters = metricResults.getCounters();



[beam] branch master updated (ec2138ba89e -> e12ad36ba83)

2023-03-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ec2138ba89e Fix SDF typo. (#25765)
 add e12ad36ba83 Document fields for Schema Transforms (#25644)

No new revisions were added by this update.

Summary of changes:
 ...ueryStorageWriteApiSchemaTransformProvider.java | 24 --
 .../PubsubLiteReadSchemaTransformProvider.java | 38 +++---
 .../PubsubLiteWriteSchemaTransformProvider.java| 15 -
 .../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java|  2 +-
 .../KafkaReadSchemaTransformConfiguration.java | 37 ++---
 .../io/kafka/KafkaReadSchemaTransformProvider.java |  4 +--
 .../kafka/KafkaWriteSchemaTransformProvider.java   | 18 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java|  2 +-
 .../KafkaReadSchemaTransformProviderTest.java  | 10 +++---
 9 files changed, 127 insertions(+), 23 deletions(-)



[beam] branch master updated (e12ad36ba83 -> c0b6db91ccc)

2023-03-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e12ad36ba83 Document fields for Schema Transforms (#25644)
 add c0b6db91ccc Fix DatastoreIO bug causing low throughput on certain 
writes (#25701)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 21 ++---
 1 file changed, 18 insertions(+), 3 deletions(-)



[beam] branch master updated: passing consumer configs from user-passed parameters (#25766)

2023-03-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 36486447e4d passing consumer configs from user-passed parameters 
(#25766)
36486447e4d is described below

commit 36486447e4d07af5076830ca1e331a6b61f14986
Author: Pablo Estrada 
AuthorDate: Wed Mar 8 14:34:43 2023 -0800

passing consumer configs from user-passed parameters (#25766)

* passing consumer configs from user-passed parameters

* fixup
---
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 47 ++
 1 file changed, 21 insertions(+), 26 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 2bab36f465a..bf5bce46180 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,7 +18,9 @@
 package org.apache.beam.sdk.io.kafka;
 
 import com.google.auto.service.AutoService;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +40,8 @@ import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -106,11 +109,18 @@ public class KafkaReadSchemaTransformProvider
   final String inputSchema = configuration.getSchema();
   final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
   final String autoOffsetReset =
-  configuration.getAutoOffsetResetConfig() == null
-  ? "latest"
-  : configuration.getAutoOffsetResetConfig();
-  if (inputSchema != null) {
-assert configuration.getConfluentSchemaRegistryUrl() == null
+  MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(), 
"latest");
+
+  Map consumerConfigs =
+  new HashMap<>(
+  
MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new 
HashMap<>()));
+  consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, 
"kafka-read-provider-" + groupId);
+  consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+  consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+  consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
+
+  if (inputSchema != null && !inputSchema.isEmpty()) {
+assert 
Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
 : "To read from Kafka, a schema must be provided directly or 
though Confluent "
 + "Schema Registry, but not both.";
 final Schema beamSchema =
@@ -126,16 +136,7 @@ public class KafkaReadSchemaTransformProvider
   public PCollectionRowTuple expand(PCollectionRowTuple input) {
 KafkaIO.Read kafkaRead =
 KafkaIO.readBytes()
-.withConsumerConfigUpdates(
-ImmutableMap.of(
-ConsumerConfig.GROUP_ID_CONFIG,
-"kafka-read-provider-" + groupId,
-ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-true,
-ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
-100,
-ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-autoOffsetReset))
+.withConsumerConfigUpdates(consumerConfigs)
 .withTopic(configuration.getTopic())
 .withBootstrapServers(configuration.getBootstrapServers());
 if (isTest) {
@@ -153,6 +154,9 @@ public class KafkaReadSchemaTransformProvider
   }
 };
   } else {
+assert 
!Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
+: "To read from Kafka, a schema must be 

[beam] branch master updated: make incoming message public (#25803)

2023-03-11 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 17f7a5d1aff make incoming message public (#25803)
17f7a5d1aff is described below

commit 17f7a5d1aff227d211c634492f4396c8422678b5
Author: Nick Li <56149585+nickuncaged1...@users.noreply.github.com>
AuthorDate: Sat Mar 11 10:55:55 2023 -0800

make incoming message public (#25803)
---
 .../src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index e4de1680d5a..f075daf2c22 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -396,7 +396,7 @@ public abstract class PubsubClient implements Closeable {
* serialization is never used for non-test clients.
*/
   @AutoValue
-  abstract static class IncomingMessage implements Serializable {
+  public abstract static class IncomingMessage implements Serializable {
 
 /** Underlying Message. */
 public abstract PubsubMessage message();



[beam] branch master updated (509052c0488 -> 0d515df6520)

2023-03-13 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 509052c0488 Mark that 2.46 has been released now that the release is 
finalized (#25818)
 add 0d515df6520 Infrastructure-as-Code to provision a private GKE 
autopilot kubernetes cluster and strimzi kafka (#25686)

No new revisions were added by this update.

Summary of changes:
 .test-infra/kafka/README.md|   36 +
 .../kafka/strimzi/01-strimzi-operator/README.md|   28 +
 .../strimzi/01-strimzi-operator/kustomization.yaml |   23 +
 .../strimzi/01-strimzi-operator/namespace.yaml |   22 +
 ...10-ServiceAccount-strimzi-cluster-operator.yaml |   23 +
 ...-ClusterRole-strimzi-cluster-operator-role.yaml |  170 +
 .../020-RoleBinding-strimzi-cluster-operator.yaml  |   31 +
 ...-ClusterRole-strimzi-cluster-operator-role.yaml |   55 +
 ...lusterRoleBinding-strimzi-cluster-operator.yaml |   31 +
 ...-ClusterRole-strimzi-cluster-operator-role.yaml |   48 +
 .../022-RoleBinding-strimzi-cluster-operator.yaml  |   31 +
 ...-ClusterRole-strimzi-cluster-operator-role.yaml |   82 +
 .../023-RoleBinding-strimzi-cluster-operator.yaml  |   31 +
 .../030-ClusterRole-strimzi-kafka-broker.yaml  |   32 +
 ...i-cluster-operator-kafka-broker-delegation.yaml |   33 +
 .../031-ClusterRole-strimzi-entity-operator.yaml   |   61 +
 ...luster-operator-entity-operator-delegation.yaml |   33 +
 .../033-ClusterRole-strimzi-kafka-client.yaml  |   33 +
 ...i-cluster-operator-kafka-client-delegation.yaml |   34 +
 .../01-strimzi-operator/v0.33.2/040-Crd-kafka.yaml | 6208 
 .../v0.33.2/041-Crd-kafkaconnect.yaml  | 1952 ++
 .../v0.33.2/042-Crd-strimzipodset.yaml |  135 +
 .../v0.33.2/043-Crd-kafkatopic.yaml|  270 +
 .../v0.33.2/044-Crd-kafkauser.yaml |  702 +++
 .../v0.33.2/045-Crd-kafkamirrormaker.yaml  | 1228 
 .../v0.33.2/046-Crd-kafkabridge.yaml   | 1178 
 .../v0.33.2/047-Crd-kafkaconnector.yaml|  146 +
 .../v0.33.2/048-Crd-kafkamirrormaker2.yaml | 1992 +++
 .../v0.33.2/049-Crd-kafkarebalance.yaml|  159 +
 .../050-ConfigMap-strimzi-cluster-operator.yaml|   52 +
 .../060-Deployment-strimzi-cluster-operator.yaml   |  155 +
 .../01-strimzi-operator/v0.33.2/kustomization.yaml |   46 +
 .../kafka/strimzi/02-kafka-persistent/README.md|   28 +
 .../base/v0.33.2/kafka-persistent.yaml |   53 +
 .../base/v0.33.2/kustomization.yaml|   22 +
 .../gke-internal-load-balanced/kustomization.yaml  |   26 +
 .../gke-internal-load-balanced/listeners.yaml  |   52 +
 .test-infra/kafka/strimzi/README.md|  160 +
 .test-infra/terraform/OWNERS   |5 +
 .test-infra/terraform/README.md|   31 +
 .../terraform/google-cloud-platform/README.md  |   44 +
 .../google-kubernetes-engine/README.md |   56 +
 .../google-kubernetes-engine/main.tf   |   73 +
 .../modules/01-setup/README.md |   26 +
 .../modules/01-setup/iam.tf|   39 +
 .../modules/01-setup/output.tf |   22 +
 .../modules/01-setup/provider.tf   |   22 +
 .../modules/01-setup/services.tf   |   28 +
 .../modules/01-setup/variables.tf  |   32 +
 .../modules/02-network/README.md   |   26 +
 .../modules/02-network/nat.tf  |   43 +
 .../modules/02-network/network.tf  |   65 +
 .../modules/02-network/output.tf   |   37 +
 .../modules/02-network/provider.tf |   22 +
 .../modules/02-network/services.tf |   23 +
 .../modules/02-network/variables.tf|   44 +
 .../modules/03-cluster/README.md   |   22 +
 .../modules/03-cluster/cluster.tf  |   42 +
 .../modules/03-cluster/provider.tf |   22 +
 .../modules/03-cluster/services.tf |   23 +
 .../modules/03-cluster/variables.tf|   53 +
 .../modules/04-bastion/README.md   |   22 +
 .../modules/04-bastion/compute.tf  |   64 +
 .../modules/04-bastion/data.tf |   44 +
 .../modules/04-bastion/provider.tf |   22 +
 .../modules/04-bastion/services.tf |   23 +
 .../modules/04-bastion/tinyproxy.conf  |  368 ++
 .../modules/04-bastion/variables.tf|   67 +
 .../google-kubernetes-engine/variables.tf  |   42 +
 .../google-kubernetes-engine/versions.tf   |   32 +
 70 files changed, 16885 insertions(+)
 create mode 100644 .test-infra/kafka/README.md
 create mode 100644 .test-infra/kafka/strimzi/01-strimzi-operator/README.md
 create mode 100644 
.test-infra/kafka/strimzi/01-strimzi

[beam] branch master updated (afce68d95c0 -> 04c2de61e5d)

2023-03-13 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from afce68d95c0 Merge pull request #25826: Corrected 
GscUtil.RewriteOp.OnFailure() error message
 add 04c2de61e5d Fix OutputSampler's coder. (#25805)

No new revisions were added by this update.

Summary of changes:
 .../harness/data/PCollectionConsumerRegistry.java  | 38 -
 .../beam/fn/harness/debug/OutputSampler.java   | 39 -
 .../beam/fn/harness/debug/DataSamplerTest.java | 31 --
 .../beam/fn/harness/debug/OutputSamplerTest.java   | 49 --
 4 files changed, 110 insertions(+), 47 deletions(-)



[beam] branch master updated: Adding DLQ support for ZetaSQL (#25873)

2023-03-17 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e8fad9ca2ba Adding DLQ support for ZetaSQL (#25873)
e8fad9ca2ba is described below

commit e8fad9ca2ba80220b4817ddba95aabc0a48067dd
Author: Pablo Estrada 
AuthorDate: Fri Mar 17 18:14:03 2023 -0400

Adding DLQ support for ZetaSQL (#25873)
---
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 126 -
 1 file changed, 97 insertions(+), 29 deletions(-)

diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 744fbd0bcd4..dee87d370e6 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -37,18 +37,24 @@ import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
 import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptCluster;
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelTraitSet;
 import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
@@ -64,7 +70,6 @@ import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdO
 import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -83,6 +88,9 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
   private static final int MAX_PENDING_WINDOW = 32;
   private final BeamSqlUnparseContext context;
 
+  private static final TupleTag rows = new TupleTag("output") {};
+  private static final TupleTag errors = new TupleTag("errors") {};
+
   private static String columnName(int i) {
 return "_" + i;
   }
@@ -101,21 +109,36 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
 
   @Override
   public PTransform, PCollection> buildPTransform() {
-return new Transform();
+return buildPTransform(null);
+  }
+
+  @Override
+  public PTransform, PCollection> buildPTransform(
+  @Nullable PTransform, ? extends POutput> 
errorsTransformer) {
+return new Transform(errorsTransformer);
   }
 
   @AutoValue
   abstract static class TimestampedFuture {
-private static TimestampedFuture create(Instant t, Future f) {
-  return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f);
+private static TimestampedFuture create(Instant t, Future f, Row r) 
{
+  return new AutoValue_BeamZetaSqlCalcRel_TimestampedFuture(t, f, r);
 }
 
 abstract Instant timestamp();
 
 abstract Future future();
+
+abstract Row row();
   }
 
   private class Transform extends PTransform, 
PCollection> {
+
+private final @Nullable PTransform, ? extends POutput> 
errorsTransformer;
+
+Transform(@Nullable PTransform, ? extends POutput> 
errorsTransformer) {
+  this.errorsTransformer = errorsTransformer;
+}
+
 @Override
 public PCollection expand(PCollectionList pinput) {
   Preconditions.checkArgument(
@@ -135,9 +1

[beam] branch master updated: Optimize change stream connector with more efficient batching and blind writes, and add transaction/query tags (#25718)

2023-03-17 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ac57b8709d3 Optimize change stream connector with more efficient 
batching and blind writes, and add transaction/query tags (#25718)
ac57b8709d3 is described below

commit ac57b8709d3f2db597a5d9e6339545b0f2036a3a
Author: ChangyuLi28 <35211213+changyul...@users.noreply.github.com>
AuthorDate: Fri Mar 17 15:13:27 2023 -0700

Optimize change stream connector with more efficient batching and blind 
writes, and add transaction/query tags (#25718)

* Optimize change stream connector with more efficient batching and blind 
writes, and add transaction/query tags

* Optimize change stream connector with more efficient batching and blind 
writes, and add transaction/query tags

* Apply commit deadline only for metadata table writes

-

Co-authored-by: Changyu Li 
---
 .../MetadataSpannerConfigFactory.java  |  2 +-
 .../action/ChildPartitionsRecordAction.java|  3 +-
 .../action/DetectNewPartitionsAction.java  | 19 --
 .../changestreams/dao/PartitionMetadataDao.java| 68 +++---
 .../action/ChildPartitionsRecordActionTest.java|  4 +-
 .../dao/PartitionMetadataDaoTest.java  | 29 +++--
 6 files changed, 75 insertions(+), 50 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
index 83965b1bfaa..56c67f3194f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
@@ -84,7 +84,7 @@ public class MetadataSpannerConfigFactory {
 
 ValueProvider commitDeadline = primaryConfig.getCommitDeadline();
 if (commitDeadline != null) {
-  config = 
config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get()));
+  config = 
config.withCommitDeadline(StaticValueProvider.of(Duration.standardSeconds(60)));
 }
 
 ValueProvider maxCumulativeBackoff = 
primaryConfig.getMaxCumulativeBackoff();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
index 70286b41778..7fb69d0e7ab 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
@@ -155,7 +155,8 @@ public class ChildPartitionsRecordAction {
   } else {
 return false;
   }
-})
+},
+"InsertChildPartition")
 .getResult();
 if (insertedRow && isSplit) {
   metrics.incPartitionRecordSplitCount();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index 934210250f5..73967d2a2a7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -146,15 +146,22 @@ public class DetectNewPartitionsAction {
   OutputReceiver receiver,
   Timestamp minWatermark,
   TreeMap> batches) {
+List batchPartitionsDifferentCreatedAt = new 
ArrayList<>();
+int numTimestampsHandledSofar = 0;
 for (Map.Entry> batch : 
batches.entrySet()) {
+  numTimestampsHandledSofar++;
   final Timestamp batchCreatedAt = batch.getKey();
-  final List batchPartitions = batch.getValue();
-
-  final Timestamp scheduledAt = updateBatchToScheduled(batchPartitions);
-  if (!tracker.tryClaim(batchCreatedAt)) {
-return ProcessContinuation.stop();
+  final List batchPartitionsSameCreatedAt = 
batch.getValue();
+  batchPartitionsDifferentCreatedAt.addAll(batchPartitionsSameCreatedAt);
+  if (batchPartit

[beam] branch master updated (315b53d6611 -> 4ceb475ebe4)

2023-03-19 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 315b53d6611 [Go SDK]: Align filesystem List behaviors (#25811)
 add 4ceb475ebe4 Google Cloud Bigtable Change Stream Connector (#25797)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 sdks/java/io/google-cloud-platform/build.gradle|   1 +
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   |  76 +---
 .../changestreams/ByteStringRangeHelper.java   | 118 +++-
 .../changestreams/ChangeStreamMetrics.java | 145 +++
 .../changestreams/ChangeStreamMutation.java|  27 ---
 .../bigtable/changestreams/TimestampConverter.java |  14 +-
 .../changestreams/action/ActionFactory.java|  18 +-
 .../changestreams/action/ChangeStreamAction.java   | 116 +++-
 .../action/DetectNewPartitionsAction.java  |  27 +--
 .../action/GenerateInitialPartitionsAction.java|  46 -
 .../action/ReadChangeStreamPartitionAction.java| 126 +++--
 .../dao/BigtableChangeStreamAccessor.java  | 194 
 .../changestreams/dao/ChangeStreamDao.java |  71 +++-
 .../gcp/bigtable/changestreams/dao/DaoFactory.java |  18 +-
 .../changestreams/dao/MetadataTableAdminDao.java   |  95 +-
 .../changestreams/dao/MetadataTableDao.java| 165 -
 .../dofn/DetectNewPartitionsDoFn.java  |  21 +--
 .../changestreams/dofn/InitializeDoFn.java |  33 +++-
 .../dofn/ReadChangeStreamPartitionDoFn.java|  18 +-
 .../encoder/MetadataTableEncoder.java  |  62 +++
 .../changestreams/encoder/package-info.java|   2 +
 .../changestreams/model/PartitionRecord.java   |  66 ---
 .../ReadChangeStreamPartitionProgressTracker.java  |   2 +-
 .../changestreams/restriction/StreamProgress.java  |  64 ++-
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |  12 +-
 .../changestreams/ByteStringRangeHelperTest.java   | 186 +++
 .../changestreams/TimestampConverterTest.java  |  22 +--
 .../action/ChangeStreamActionTest.java | 179 ++
 .../action/DetectNewPartitionsActionTest.java  | 122 +
 .../GenerateInitialPartitionsActionTest.java   | 115 
 .../ReadChangeStreamPartitionActionTest.java   | 202 +
 .../dao/MetadataTableAdminDaoTest.java | 126 +
 .../changestreams/dao/MetadataTableDaoTest.java| 140 ++
 .../changestreams/dofn/InitializeDoFnTest.java | 123 +
 ...adChangeStreamPartitionProgressTrackerTest.java |  84 +
 36 files changed, 2622 insertions(+), 216 deletions(-)
 delete mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/MetadataTableEncoder.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelperTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDaoTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java



[beam] branch master updated (6cb7b8e5f82 -> 5f9bf8b74f3)

2023-03-20 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6cb7b8e5f82 Ensure truncate element is wrapped in *FullValue (#25908)
 add 5f9bf8b74f3 Pubsub test client fixup (#25907)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java| 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)



[beam] branch master updated (8c90c64836c -> 2c075b3603f)

2023-03-21 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 8c90c64836c Merge pull request #25802 Use the cord ctype for proto 
generation, where available.
 add 2c075b3603f Advance DetectNewPartition's watermark by aggregating the 
(#25906)

No new revisions were added by this update.

Summary of changes:
 .../changestreams/ByteStringRangeHelper.java   | 158 +
 .../action/DetectNewPartitionsAction.java  |  92 
 .../changestreams/dao/MetadataTableDao.java|  40 +-
 .../changestreams/ByteStringRangeHelperTest.java   | 105 ++
 .../action/DetectNewPartitionsActionTest.java  |  61 
 .../changestreams/dao/MetadataTableDaoTest.java|  71 +
 6 files changed, 467 insertions(+), 60 deletions(-)



[beam] branch master updated (ca0787642a6 -> b2bd2a8cbe2)

2023-03-22 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ca0787642a6 Merge pull request #25723: #25722 Add option to propagate 
successful storage-api writes
 add b2bd2a8cbe2 Refactoring: Moved hard coded environment variables' names 
into constants. (#25365)

No new revisions were added by this update.

Summary of changes:
 playground/infrastructure/checker.py   |  9 +
 playground/infrastructure/ci_cd.py |  5 +++--
 playground/infrastructure/config.py|  5 +++--
 .../infrastructure/constants.py| 14 ++
 playground/infrastructure/grpc_client.py   |  5 +++--
 playground/infrastructure/helper.py|  3 ++-
 6 files changed, 22 insertions(+), 19 deletions(-)
 copy .test-infra/kafka/strimzi/01-strimzi-operator/kustomization.yaml => 
playground/infrastructure/constants.py (73%)



[beam] branch master updated: validated the input schema matches with the bigquery output schema (#25915)

2023-03-27 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0e02b8b1b6d validated  the input schema matches with the bigquery 
output schema (#25915)
0e02b8b1b6d is described below

commit 0e02b8b1b6d0b7a92805fa13e0d65a595149ad1d
Author: xianhualiu <122747878+xianhua...@users.noreply.github.com>
AuthorDate: Mon Mar 27 13:02:36 2023 -0400

validated  the input schema matches with the bigquery output schema (#25915)

* validated schema to make sure the input schema matches with the bigquery 
output schema

* added unit test cases for schema validation in bigquery streamio write
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 11 
 ...ueryStorageWriteApiSchemaTransformProvider.java | 52 +
 ...StorageWriteApiSchemaTransformProviderTest.java | 68 ++
 3 files changed, 131 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 825707f3cae..362afb008ea 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -574,6 +574,17 @@ public class BigQueryHelpers {
 }
   }
 
+  public static @Nullable Table getTable(BigQueryOptions options, 
TableReference tableRef)
+  throws InterruptedException, IOException {
+try (DatasetService datasetService = new 
BigQueryServicesImpl().getDatasetService(options)) {
+  return datasetService.getTable(tableRef);
+} catch (IOException | InterruptedException e) {
+  throw e;
+} catch (Exception e) {
+  throw new RuntimeException(e);
+}
+  }
+
   static String getDatasetLocation(
   DatasetService datasetService, String projectId, String datasetId) {
 Dataset dataset;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 605228e54a7..8cc1bddc10c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Collections;
@@ -33,12 +35,15 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -61,6 +66,8 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
Storage Write API jobs
@@ -77,6 +84,8 @@ import org.joda.time.Duration;
 @AutoService(SchemaTransformProvider.class)
 

[beam] branch master updated: Issue25731 beam shortcut key sequence (#25876)

2023-03-27 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new cff22144756 Issue25731 beam shortcut key sequence (#25876)
cff22144756 is described below

commit cff221447564614abd6b0439459f60af4633e4b8
Author: Dmitry Repin 
AuthorDate: Mon Mar 27 21:11:46 2023 +0400

Issue25731 beam shortcut key sequence (#25876)

* Integration test to load the default example of the default SDK and 
change the example (#24730) (#24729)

* Fix formatting and README (#24730)

* Support collection v1.17.0 (#24730)

* LoadingIndicator on chaning examples, remove duplicating licenses (#24730)

* Add a missing license header (#24730)

* Integration test for changing SDK and running code (#24779) (#382)

* Integration test for changing SDK and running code (#24779)

* Rename an integration test (#24779)

* Use enum to switch SDK in integration test (#24779)

* Find SDK in a dropdown by key (#24779)

* Add a TODO (#24779)

* Fix exports (#24779)

* Issue24779 integration changing sdk from 24370 (#387)

* Integration test for changing SDK and running code (#24779)

* Rename an integration test (#24779)

* Use enum to switch SDK in integration test (#24779)

* Find SDK in a dropdown by key (#24779)

* Add a TODO (#24779)

* Fix exports (#24779)

* Integration tests miscellaneous UI (#383)

* miscellaneous ui integration tests

* reverted pubspec.lock

* gradle tasks ordered alhpabetically

* integration tests refactoring

* clean code

* integration tests miscellaneous ui fix pr

* rename method

* added layout adaptivity

* A minor cleanup (#24779)

Co-authored-by: Dmitry Repin 

* integration tests run and editing

* example selector test

* minor fixes

* rat

* fix pr

* minor

* minor

* rat

* integration test finder written

* integration test minor fixes

* minor fixes

* removed comment

* minor fixes

* playground integration tests minor fixes

* integration test pumpAnSettleNoException

* integration test shortcut refactor

* integration test another changing shortcuts running

* upgrade to flutter 3.7.1

* workaround comment

* playground frontend updated major versions

* issues 25329 25331 25336

* 25329 extract connectivity extension to separate file

* Upgrade Flutter to 3.7.3 in integration tests (#24730)

* Fix integration test (#24730)

* fix cors issue and added mouse scroll to tags

* Upgrade Flutter in Dockerfile (#24720)

* sorting moved to model

* sorting moved to model

* sorting moved to model

* bugs fix

* issue 25278

* fix pr

* quites fix in en.yaml

* Fix not loading default example (#25528)

* fix compile error

* Refactor output tabs, test embedded playground (#25136) (#439)

* Refactor output tabs, test embedded playground (#25136)

* Clean up (#25136)

* Change example paths to IDs in integration tests

* issue25731 shortcut keys sequence

* Update 
playground/frontend/playground_components/test/src/models/shortcut_test.dart

Co-authored-by: alexeyinkin 

* issue25731 fix shortcut modal width, fix shortcut meta key text

* issue25731 fix shortcut modal window

* issue25731 pr fix

* issue25731 fix pr

* issue25731 remane file

* fix test

-

Co-authored-by: Alexey Inkin 
Co-authored-by: alexeyinkin 
---
 .../integration_test/common/common_finders.dart|  4 +-
 .../miscellaneous_ui/shortcuts_modal_test.dart |  2 +-
 .../integration_test/standalone_editing_test.dart  |  4 +-
 .../standalone_run_shortcuts_test.dart |  6 +-
 .../shortcuts/components/shortcuts_dialog.dart | 77 +
 .../shortcuts/components/shortcuts_modal.dart  | 97 --
 .../shortcuts/constants/global_shortcuts.dart  |  8 +-
 .../widgets/more_actions.dart  |  8 +-
 .../assets/translations/en.yaml|  3 +
 .../lib/playground_components.dart |  2 +
 .../lib/src/controllers/playground_controller.dart | 12 +--
 .../lib/src/models/shortcut.dart   | 18 ++--
 .../lib/src/util/iterable.dart}| 21 ++---
 .../src/widgets/{dialog.dart => close_button.dart} | 42 --
 .../lib/src/widgets/dialog.dart| 27 --
 .../lib/src/widg

[beam] branch master updated: Lock ReadChangeStreamPartition so only one DoFn can work on one partition (#25938)

2023-03-27 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 02d3084cabb Lock ReadChangeStreamPartition so only one DoFn can work 
on one partition (#25938)
02d3084cabb is described below

commit 02d3084cabb0f0912be06784ce1034cef11f746c
Author: Jack Dingilian 
AuthorDate: Mon Mar 27 13:38:23 2023 -0400

Lock ReadChangeStreamPartition so only one DoFn can work on one partition 
(#25938)
---
 .../action/ReadChangeStreamPartitionAction.java| 18 +
 .../changestreams/dao/MetadataTableDao.java| 50 
 .../ReadChangeStreamPartitionProgressTracker.java  |  3 +-
 .../changestreams/restriction/StreamProgress.java  | 30 +---
 .../ReadChangeStreamPartitionActionTest.java   | 49 ++--
 .../changestreams/dao/MetadataTableDaoTest.java| 88 ++
 ...adChangeStreamPartitionProgressTrackerTest.java | 19 +
 7 files changed, 241 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index 3daa44a80e9..4309c94a9e5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -138,6 +138,24 @@ public class ReadChangeStreamPartitionAction {
   + tracker.currentRestriction());
 }
 
+// Lock the partition
+if (!metadataTableDao.lockPartition(
+partitionRecord.getPartition(), partitionRecord.getUuid())) {
+  LOG.info(
+  "RCSP: Could not acquire lock for partition: {}, with uid: {}, 
because this is a "
+  + "duplicate and another worker is working on this partition 
already.",
+  formatByteStringRange(partitionRecord.getPartition()),
+  partitionRecord.getUuid());
+  StreamProgress streamProgress = new StreamProgress();
+  streamProgress.setFailToLock(true);
+  metrics.decPartitionStreamCount();
+  if (!tracker.tryClaim(streamProgress)) {
+LOG.debug("RCSP: Failed to claim tracker after failing to lock 
partition.");
+return ProcessContinuation.stop();
+  }
+  return ProcessContinuation.stop();
+}
+
 // Process CloseStream if it exists
 CloseStream closeStream = tracker.currentRestriction().getCloseStream();
 if (closeStream != null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
index eeeb3682ccf..706939df31d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
@@ -25,6 +25,9 @@ import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTabl
 import com.google.api.gax.rpc.ServerStream;
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
 import com.google.cloud.bigtable.data.v2.models.Query;
 import com.google.cloud.bigtable.data.v2.models.Range;
 import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
@@ -262,6 +265,53 @@ public class MetadataTableDao {
 dataClient.mutateRow(rowMutation);
   }
 
+  /**
+   * Lock the partition in the metadata table for the DoFn streaming it. Only 
one DoFn is allowed to
+   * stream a specific partition at any time. Each DoFn has an uuid and will 
try to lock the
+   * partition at the very start of the stream. If another DoFn has already 
locked the partition
+   * (i.e. the uuid in the cell for the partition belongs to the DoFn), any 
future DoFn trying to
+   * lock the same partition will and terminate.
+   *
+   * @param partition form the row key in the metadata table to lock
+   * @param uuid id of the DoFn
+   * @return true if uuid holds the lock, otherwise false.
+   */
+  public boolean lockPartition(ByteStringRange partition, String uuid) {
+LOG.debug("Locking partition before processing

[beam] branch master updated (14a77825928 -> 6484a47c3d5)

2023-03-28 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 14a77825928 Add explicit language_level=3 arg to Cythonize (#26015)
 add 6484a47c3d5 Fix TestPipeline.runWithAdditionalOptionArgs ignore 
additionalArgs (#25937)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/beam/sdk/testing/TestPipeline.java  | 23 +--
 .../apache/beam/sdk/testing/TestPipelineTest.java  | 26 ++
 2 files changed, 37 insertions(+), 12 deletions(-)



[beam] branch master updated (40382c84130 -> 9c52e0594d6)

2023-03-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 40382c84130 BigQueryHelpers: log statuses using PrettyString (#26037)
 add 9c52e0594d6 Make DetectNewPartitions handle Splits and Merges (#25997)

No new revisions were added by this update.

Summary of changes:
 .../changestreams/ByteStringRangeHelper.java   |  99 +
 .../ChangeStreamContinuationTokenHelper.java   |  47 ++
 .../changestreams/ChangeStreamMetrics.java |  36 +
 .../action/DetectNewPartitionsAction.java  | 158 -
 .../action/ReadChangeStreamPartitionAction.java|  35 +++--
 .../changestreams/dao/MetadataTableDao.java|  40 +-
 .../changestreams/ByteStringRangeHelperTest.java   |  93 
 .../ChangeStreamContinuationTokenHelperTest.java   |  68 +
 .../action/DetectNewPartitionsActionTest.java  | 157 
 .../ReadChangeStreamPartitionActionTest.java   |  89 +++-
 .../changestreams/dao/MetadataTableDaoTest.java|   4 +-
 11 files changed, 778 insertions(+), 48 deletions(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamContinuationTokenHelper.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamContinuationTokenHelperTest.java



[beam] branch master updated: Adding support for GCS-stored files for consumer config overrides (#25773)

2023-03-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b006531b07 Adding support for GCS-stored files for consumer config 
overrides (#25773)
6b006531b07 is described below

commit 6b006531b07cfdcb4c1e8eec02d2ff9c672a135f
Author: Pablo Estrada 
AuthorDate: Thu Mar 30 20:03:42 2023 -0700

Adding support for GCS-stored files for consumer config overrides (#25773)

* Adding support for GCS-stored files for consumer config overrides

* fixup
---
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 76 ++
 1 file changed, 76 insertions(+)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index bf5bce46180..053cd3ff76f 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,13 +18,23 @@
 package org.apache.beam.sdk.io.kafka;
 
 import com.google.auto.service.AutoService;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
 import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -43,14 +53,20 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class KafkaReadSchemaTransformProvider
 extends 
TypedSchemaTransformProvider {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaReadSchemaTransformProvider.class);
+
   final Boolean isTest;
   final Integer testTimeoutSecs;
 
@@ -137,6 +153,7 @@ public class KafkaReadSchemaTransformProvider
 KafkaIO.Read kafkaRead =
 KafkaIO.readBytes()
 .withConsumerConfigUpdates(consumerConfigs)
+.withConsumerFactoryFn(new 
ConsumerFactoryWithGcsTrustStores())
 .withTopic(configuration.getTopic())
 .withBootstrapServers(configuration.getBootstrapServers());
 if (isTest) {
@@ -171,6 +188,7 @@ public class KafkaReadSchemaTransformProvider
 KafkaIO.Read kafkaRead =
 KafkaIO.read()
 .withTopic(configuration.getTopic())
+.withConsumerFactoryFn(new 
ConsumerFactoryWithGcsTrustStores())
 .withBootstrapServers(configuration.getBootstrapServers())
 .withConsumerConfigUpdates(consumerConfigs)
 .withKeyDeserializer(ByteArrayDeserializer.class)
@@ -193,4 +211,62 @@ public class KafkaReadSchemaTransformProvider
   }
 }
   };
+
+  private static class ConsumerFactoryWithGcsTrustStores
+  implements SerializableFunction, Consumer> {
+
+@Override
+public Consumer apply(Map input) {
+  return KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN.apply(
+  input.entrySet().stream()
+  .map(
+  entry ->
+  Maps.immutableEntry(
+  entry.getKey(), 
identityOrGcsToLocalFile(entry.getValue(
+  .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+}
+
+private static Object identityOrGcsToLocalFile(Object configValue) {
+  if (configValue instanceof String) {
+String configStr = (String) configValue;
+if (configStr.startsWith("gs://")) {
+  try {
+

[beam] branch master updated: Add partition metadata in error logs in the restriction tracker for change streams and delete unused files (#26040)

2023-03-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new dc7a7a02c7a Add partition metadata in error logs in the restriction 
tracker for change streams and delete unused files (#26040)
dc7a7a02c7a is described below

commit dc7a7a02c7ac64887b7ada426c8385a221859af9
Author: ChangyuLi28 <35211213+changyul...@users.noreply.github.com>
AuthorDate: Thu Mar 30 20:38:33 2023 -0700

Add partition metadata in error logs in the restriction tracker for change 
streams and delete unused files (#26040)
---
 .../changestreams/restriction/PartitionMode.java   |  35 -
 .../restriction/PartitionPosition.java |  90 ---
 .../restriction/PartitionRestriction.java  | 141 
 .../restriction/PartitionRestrictionClaimer.java   | 136 
 .../restriction/PartitionRestrictionMetadata.java  | 106 ---
 .../PartitionRestrictionProgressChecker.java   |  96 ---
 .../restriction/PartitionRestrictionSplitter.java  | 128 ---
 .../restriction/PartitionRestrictionTracker.java   | 159 
 .../ReadChangeStreamPartitionRangeTracker.java |   2 +-
 .../restriction/TimestampRangeTracker.java |  24 +
 .../restriction/PartitionPositionTest.java |  70 --
 .../PartitionRestrictionClaimerTest.java   | 272 ---
 .../PartitionRestrictionProgressCheckerTest.java   | 230 --
 .../PartitionRestrictionSplitterTest.java  | 168 
 .../restriction/PartitionRestrictionTest.java  |  78 --
 .../PartitionRestrictionTrackerTest.java   | 859 -
 .../util/PartitionPositionGenerator.java   |  54 --
 17 files changed, 25 insertions(+), 2623 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/PartitionMode.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/PartitionMode.java
deleted file mode 100644
index 341dc7aa38f..000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/PartitionMode.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
-
-/** This enum contains the states that PartitionRestrictionTracker will go 
through. */
-public enum PartitionMode {
-  // In this state, the restriction tracker will update the state of the input 
partition token
-  // from SCHEDULED to RUNNING.
-  UPDATE_STATE,
-  // In this state, the restriction tracker will execute a change stream query.
-  QUERY_CHANGE_STREAM,
-  // In this state, the restriction tracker will wait for the child partition 
SDFs to start
-  // running before terminating the SDF.
-  WAIT_FOR_CHILD_PARTITIONS,
-  // In this state, the restriction tracker will terminate the SDF.
-  DONE,
-
-  // Occurs when Dataflow checkpoints the current restriction.
-  STOP
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/PartitionPosition.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/PartitionPosition.java
deleted file mode 100644
index 1011fadc74a..000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/PartitionPosition.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable l

[beam] branch master updated (dc7a7a02c7a -> 5e368b4d7ce)

2023-03-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from dc7a7a02c7a Add partition metadata in error logs in the restriction 
tracker for change streams and delete unused files (#26040)
 add 5e368b4d7ce Add partition reconciler to handle known cases where 
partitions can get stuck (#26045)

No new revisions were added by this update.

Summary of changes:
 .../changestreams/ByteStringRangeHelper.java   |  13 ++
 .../action/DetectNewPartitionsAction.java  |  80 +++--
 .../changestreams/dao/MetadataTableDao.java|  72 +++-
 .../reconciler/PartitionReconciler.java| 153 
 .../{encoder => reconciler}/package-info.java  |   6 +-
 .../changestreams/restriction/StreamProgress.java  |   2 +-
 .../changestreams/ByteStringRangeHelperTest.java   |  15 ++
 .../action/DetectNewPartitionsActionTest.java  |  84 +++--
 .../changestreams/dao/MetadataTableDaoTest.java|  41 +
 .../reconciler/PartitionReconcilerTest.java| 195 +
 10 files changed, 621 insertions(+), 40 deletions(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/{encoder
 => reconciler}/package-info.java (84%)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconcilerTest.java



[beam] branch master updated (48d66ada923 -> c6f8e0be1a5)

2023-04-04 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 48d66ada923 Remove status disclaimer from pydoc root. (#26106)
 add c6f8e0be1a5 Implement GetSize for ReadChangeStream to support 
autoscaling  (#26052)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java   |  23 +++-
 .../bigtable/changestreams/TimestampConverter.java |   4 +
 .../changestreams/action/ActionFactory.java|  11 +-
 .../changestreams/action/ChangeStreamAction.java   |   9 +-
 .../dofn/ReadChangeStreamPartitionDoFn.java|  57 +++-
 .../estimator/BytesThroughputEstimator.java|  74 +++
 .../changestreams/estimator/EncodingException.java |   5 +-
 .../estimator/NullThroughputEstimator.java |  12 +-
 .../changestreams/estimator/SizeEstimator.java |   6 +-
 .../estimator/ThroughputEstimator.java |  12 +-
 .../{encoder => estimator}/package-info.java   |   7 +-
 .../changestreams/TimestampConverterTest.java  |   6 +
 .../action/ChangeStreamActionTest.java |   8 +-
 .../dofn/ReadChangeStreamPartitionDoFnTest.java| 143 +
 .../estimator/BytesThroughputEstimatorTest.java|  83 ++--
 .../estimator/NullThroughputEstimatorTest.java |  12 +-
 16 files changed, 370 insertions(+), 102 deletions(-)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/BytesThroughputEstimator.java (62%)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/EncodingException.java (88%)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/NullThroughputEstimator.java (84%)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/SizeEstimator.java (91%)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/ThroughputEstimator.java (83%)
 copy 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/{encoder
 => estimator}/package-info.java (74%)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 copy 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/BytesThroughputEstimatorTest.java (60%)
 copy 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/{spanner
 => bigtable}/changestreams/estimator/NullThroughputEstimatorTest.java (73%)



[beam] branch master updated: Fix Jackson being imported from both org.codehaus.jackson (old) and com.fasterxml.jackson (new)

2020-10-20 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 73aee9e  Fix Jackson being imported from both org.codehaus.jackson 
(old) and com.fasterxml.jackson (new)
 new e5b63a1  Merge pull request #13152 from sjvanrossum/fix-jackson-import 
- Fix imports from old and new Jackson packages
73aee9e is described below

commit 73aee9ef86f3db8c86c0cf72f3ff34bc804ec79f
Author: Steven van Rossum 
AuthorDate: Tue Oct 20 20:32:03 2020 +0200

Fix Jackson being imported from both org.codehaus.jackson (old) and 
com.fasterxml.jackson (new)
---
 .../src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 86dcc18..7f10437 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.healthcare;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.healthcare.v1beta1.model.DeidentifyConfig;
 import com.google.api.services.healthcare.v1beta1.model.HttpBody;
@@ -84,7 +85,6 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.codehaus.jackson.JsonProcessingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 



[beam] branch master updated (07856e3 -> f87a671)

2020-10-21 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 07856e3  Merge pull request #13113: Upgrade checkerframework and 
gradle plugin
 add f87a671  Merge pull request #13153 from [BEAM-11093] Adding BQ 
snippets for Dynamic Destinations and Time Partitioning

No new revisions were added by this update.

Summary of changes:
 .../apache_beam/examples/snippets/snippets.py  | 29 ++
 .../documentation/io/built-in/google-bigquery.md   | 17 ++---
 2 files changed, 31 insertions(+), 15 deletions(-)



[beam] branch master updated: [BEAM-7746] Fix typing in runners

2020-10-21 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 95e2374  [BEAM-7746] Fix typing in runners
 new 5156634  Merge pull request #13060 from [BEAM-7746] Fix typing in 
runners
95e2374 is described below

commit 95e2374c659ba9e7fbe42ac217c0b367120b6ee1
Author: Chad Dombrova 
AuthorDate: Mon Oct 5 16:51:34 2020 -0700

[BEAM-7746] Fix typing in runners
---
 .../runners/portability/artifact_service.py|  18 +--
 .../runners/portability/fn_api_runner/execution.py |  82 ++--
 .../runners/portability/fn_api_runner/fn_runner.py | 121 --
 .../portability/fn_api_runner/translations.py  |  25 ++--
 .../portability/fn_api_runner/worker_handlers.py   | 137 ++---
 .../runners/portability/portable_runner.py |  17 ++-
 .../apache_beam/runners/portability/stager.py  |   1 +
 sdks/python/apache_beam/utils/profiler.py  |  24 ++--
 sdks/python/mypy.ini   |  14 ---
 9 files changed, 302 insertions(+), 137 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py 
b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 1f3ec1c..18537f4 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -33,9 +33,15 @@ import queue
 import sys
 import tempfile
 import threading
-import typing
 from io import BytesIO
+from typing import Any
+from typing import BinaryIO  # pylint: disable=unused-import
 from typing import Callable
+from typing import Dict
+from typing import List
+from typing import MutableMapping
+from typing import Optional
+from typing import Tuple
 
 import grpc
 from future.moves.urllib.request import urlopen
@@ -48,11 +54,6 @@ from apache_beam.portability.api import 
beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.utils import proto_utils
 
-if typing.TYPE_CHECKING:
-  from typing import BinaryIO  # pylint: disable=ungrouped-imports
-  from typing import Iterable
-  from typing import MutableMapping
-
 
 class ArtifactRetrievalService(
 beam_artifact_api_pb2_grpc.ArtifactRetrievalServiceServicer):
@@ -61,7 +62,7 @@ class ArtifactRetrievalService(
 
   def __init__(
   self,
-  file_reader,  # type: Callable[[str], BinaryIO],
+  file_reader,  # type: Callable[[str], BinaryIO]
   chunk_size=None,
   ):
 self._file_reader = file_reader
@@ -105,7 +106,8 @@ class ArtifactStagingService(
   file_writer,  # type: Callable[[str, Optional[str]], Tuple[BinaryIO, 
str]]
 ):
 self._lock = threading.Lock()
-self._jobs_to_stage = {}
+self._jobs_to_stage = {
+}  # type: Dict[str, Tuple[Dict[Any, 
List[beam_runner_api_pb2.ArtifactInformation]], threading.Event]]
 self._file_writer = file_writer
 
   def register_job(
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
index 5b8e91c..bc69123 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
@@ -17,6 +17,8 @@
 
 """Set of utilities for execution of a pipeline by the FnApiRunner."""
 
+# mypy: disallow-untyped-defs
+
 from __future__ import absolute_import
 
 import collections
@@ -26,10 +28,12 @@ from typing import TYPE_CHECKING
 from typing import Any
 from typing import DefaultDict
 from typing import Dict
+from typing import Iterable
 from typing import Iterator
 from typing import List
 from typing import MutableMapping
 from typing import Optional
+from typing import Set
 from typing import Tuple
 
 from typing_extensions import Protocol
@@ -59,9 +63,13 @@ from apache_beam.utils import proto_utils
 from apache_beam.utils import windowed_value
 
 if TYPE_CHECKING:
-  from apache_beam.coders.coder_impl import CoderImpl
+  from apache_beam.coders.coder_impl import CoderImpl, WindowedValueCoderImpl
+  from apache_beam.portability.api import endpoints_pb2
   from apache_beam.runners.portability.fn_api_runner import worker_handlers
+  from apache_beam.runners.portability.fn_api_runner.fn_runner import 
DataOutput
+  from apache_beam.runners.portability.fn_api_runner.fn_runner import 
OutputTimers
   from apache_beam.runners.portability.fn_api_runner.translations import 
DataSideInput
+  from apache_beam.transforms import core
   from apache_beam.transforms.window import BoundedWindow
 
 ENCODED_IMPULSE_VALUE = WindowedValueCoder(
@@ -87,13 +95,27 @@ class PartitionableBuffer(Buffer, Protocol):
 # type: (int) -> List[List[bytes]]
 pass
 
+  @property
+  def cleared(self):
+# type: ()

[beam] branch master updated (e9dcd54 -> b35d4cc)

2020-10-27 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from e9dcd54  Merge pull request #12643 from robinyqiu/math
 add b35d4cc  Merge pull request #13180 from [BEAM-9094] Configure S3 
client for IO to s3 compatible object store

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |  1 +
 .../apache_beam/io/aws/clients/s3/boto3_client.py  | 34 --
 .../apache_beam/io/aws/clients/s3/client_test.py   |  3 +-
 sdks/python/apache_beam/io/aws/s3filesystem.py | 29 ++-
 .../python/apache_beam/io/aws/s3filesystem_test.py | 22 ++--
 sdks/python/apache_beam/io/aws/s3io.py |  6 ++--
 sdks/python/apache_beam/io/aws/s3io_test.py|  3 +-
 .../python/apache_beam/options/pipeline_options.py | 42 ++
 8 files changed, 114 insertions(+), 26 deletions(-)



[beam] branch master updated: Update release guide instructions.

2020-10-27 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 65e66e6  Update release guide instructions.
 new 5076255  Merge pull request #13012 from ibzib/docker-push-release - 
Update release guide instructions.
65e66e6 is described below

commit 65e66e631be0c002ef388f48fc0dc98e5ca9e414
Author: Kyle Weaver 
AuthorDate: Mon Oct 5 12:41:40 2020 -0700

Update release guide instructions.

Pushing a Docker release now requires INFRA to grant permission, not Beam 
PMC.
---
 website/www/site/content/en/contribute/release-guide.md | 14 +-
 1 file changed, 1 insertion(+), 13 deletions(-)

diff --git a/website/www/site/content/en/contribute/release-guide.md 
b/website/www/site/content/en/contribute/release-guide.md
index d23b5d2..3de11ea 100644
--- a/website/www/site/content/en/contribute/release-guide.md
+++ b/website/www/site/content/en/contribute/release-guide.md
@@ -219,20 +219,8 @@ After successful login, authorization info will be stored 
at ~/.docker/config.js
"auth": "xx"
 }
 ```
-Release managers should have push permission; please ask for help at dev@.
-```
-From: Release Manager
-To: d...@beam.apache.org
-Subject: DockerHub Push Permission
-
-Hi DockerHub Admins
+Release managers should have push permission; request membership in the 
[`beammaintainers` 
team](https://hub.docker.com/orgs/apache/teams/beammaintainers) by filing a 
JIRA with the Apache Infrastructure team, like 
[INFRA-20900](https://issues.apache.org/jira/browse/INFRA-20900).
 
-I need push permission to proceed with release, can you please add me to 
maintainer team?
-My docker hub ID is: xxx
-
-Thanks,
-Release Manager
-```
 ### Create a new version in JIRA
 
 When contributors resolve an issue in JIRA, they are tagging it with a release 
that will contain their changes. With the release currently underway, new 
issues should be resolved against a subsequent future release. Therefore, you 
should create a release item for this subsequent release, as follows:



[beam] branch master updated (5b38285 -> 5e45db3)

2020-10-29 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 5b38285  [BEAM-9547] Implementation for drop, explode (#13139)
 add 5e45db3  Merge pull request #13210 from [BEAM-10994] Update hot key 
detection log message

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java  | 6 +-
 .../org/apache/beam/runners/dataflow/worker/HotKeyLoggerTest.java   | 6 +-
 2 files changed, 10 insertions(+), 2 deletions(-)



svn commit: r42240 - /release/beam/KEYS

2020-11-02 Thread pabloem
Author: pabloem
Date: Mon Nov  2 20:00:03 2020
New Revision: 42240

Log:
Adding Beam 2.26.0 release manager to KEYS.

Modified:
release/beam/KEYS

Modified: release/beam/KEYS
==
--- release/beam/KEYS (original)
+++ release/beam/KEYS Mon Nov  2 20:00:03 2020
@@ -2089,3 +2089,62 @@ mBk/pEiXK8ArDbFBeYSeh3DVV4y6nMh3bLksaf/X
 mXedxOnyjc6tcItjU+a+2P5Dwdph7N+oBeq9xA==
 =wDql
 -END PGP PUBLIC KEY BLOCK-
+pub   rsa4096 2020-11-02 [SC]  
+  A52F5C83BAE26160120EC25F3D56ACFBFB2975E1
+uid   [ultimate] Robert Burke (Beam Release Manager for 2.26) 

+sig 33D56ACFBFB2975E1 2020-11-02  Robert Burke (Beam Release Manager 
for 2.26) 
+sub   rsa4096 2020-11-02 [E]
+sig  3D56ACFBFB2975E1 2020-11-02  Robert Burke (Beam Release Manager 
for 2.26) 
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+
+mQINBF+gVh8BEACujN550q+2D8pSUld+QadjfFiTJnjCyAcCx8TSgBRgbN4bbg5v
+1il7NSVwbR4yhG1hFtQYD1h+JWXMZWScc3TpWeAl9fSzQk+vlRay0yVciwk/SCL5
+8Z/kzvQpJnCmgetDTqam+7ltVCPdQ/HqGp3ZlZ6EmUpo4LYBTXpwIoMSsHtNBvjX
+glRXjJAjZdfqU/CzKU3cNuunYU5KpNtECiEzKwJNtC1hUnjgHt+pRNwEcvuouOof
+Uc+YdKPj8G/H0TaqC3WP9TXvj/DCE6G51s2dc5Abz3p7yV/kY5qOCKgohS3nXqDr
+WkbzWvrOZPKjehJaszVRPY+y/lf7pMT43Px/WvqK71MNiEyrekHUJdBBWe3TBKYh
+uIVU4ZKx9pbI9hdu+CbBgsbcgiskV/h0xFf9TAQjuMIbKY8LMVEjLWGb28tEs7Qu
+sdO1pQTVclWD3RrjKyS4GjZXuramyVlA4553Yj62l2X8lOXycuSyQdsRkoHTPUHz
+EcOcqYWosUHCytlVk+58QJKmkpytVwFuu1KP4vMNIAEl8JBhg/WN4AHaG5Z8sNSs
+D3HLIIIzQN9cHKAveX0RdHmcSPlAFR9nYIDg7Jd06QJsg76T7QIosrtZx+9AV9PT
+/eWnOliJsb5iyJgp9wCmzRLrK+4cIdWxYMtN/PbByPXpkYOcdnWTif0zyQARAQAB
+tEJSb2JlcnQgQnVya2UgKEJlYW0gUmVsZWFzZSBNYW5hZ2VyIGZvciAyLjI2KSA8
+bG9zdGx1Y2tAYXBhY2hlLm9yZz6JAk4EEwEKADgWIQSlL1yDuuJhYBIOwl89Vqz7
++yl14QUCX6BWHwIbAwULCQgHAgYVCgkICwIEFgIDAQIeAQIXgAAKCRA9Vqz7+yl1
+4ZOxEACLkGIlWFaM/gMtzkFJcPeShJdLuh7P6acVcmnFmKlXocnJC+vGbK5rvP4g
+zC6kM1Q2ImLw0odKsFo7I0DXIxfQ716ieFlIgDSol0F39YAGdtPFCSYXxuJvW9mk
+xPK+kzncmHMXRrn15aGqP0TtA97qtvCUgWxSAlyqOfCIaEmopf5HVWVmctgaPtdr
+RFTBz3ojN3yLoGl/hR8E8Pt1NjkMFVi+Uh1DEuOCjBdctqeEvvo47QpP90Y3qADO
+Cji5GXncKNDFQ81E2YRyy10trrJ19jZ4gG4RmFQIAYMQiL3a7Szd7t80OWyG7+yP
+34CxMBkl+2PeoTOFVWrnNDTXkHsJctE9wZyHdv68Qd70YVwsnFjnlZRrxZdDzvOj
+zT2YwcWbZR5nlOO9T0J3itdK3G9q1+vVZAGM3PfKCEKeEhlKQe++7H3KLfq2LwNx
+GAxpdz2TSp4PqBqzQ08hL4zTrbEhaNh+NmkfWUsoKh5Ac66vyF/fPNXZzcJ50lJt
+kxrGz0Zi4wFZHMOIBmquwU+rK72aEodF8juUiYvensv5WvKbqLaGSq5tPfQ73Yfz
+M9eGVafSsJprXqsz45m/j+O8+qRM2fxfB67QwEVuJnUHHcrCCfPExQlQgpwkSmy+
+tyThFY7fkwGMeAtpMP/GP5ZfYeZGhknjG+MevM0ErCD/DVjJT7kCDQRfoFYfARAA
+0+19D4rS4iGhTv/qkhBCoRZ2OWJR67MamkROcp69TF90rtfAjkjCGYPAwyJN9V49
+WQUZWqHucuPafCLRW7/rnLdYjQFTr6m3+PTAZe0jR/mCh4DzIerZTM0RyOLRI/gU
+Dmm8MYd+jdXQAPH1rT/3Sbbr38rsduP3IKiU0Afa7Me58kHqBQRo3OEn8U4BqYj+
+hPWIjV+4VDecmVac0nouHSy9YlO79DzQ27AHXOWDQcHBDcufBcalMLgFs7/bKaU3
+KlT0SS53/9vY4FodFIokzrTqijkdV5UUDbKiv/RyRT4kyI6fMZ3w1HJJKecObSaT
+82P96CTPcB7o9+S7XQkSHlKCaGcO5xxoGo9L32QROF/+8DQ1v9X1aFx1gZik74Ke
+KtmGwULDczNbhn9Yv8a3d7+vRgh5gYX492PqwmiYcHDmQbyKICXxokfAsJL9w7CW
+UpUbKgi5YzAnZk3hR8KqD5pbyvV4d2eu6VAY1zq0UN/QSAArzgyLznKcRKYqxNb1
+z/sWJoNz/CR+4yeRrgzl2g0ZtinGqCbDWQ5gR7FBCtGv74jkTSUHDgwuZIHrbhF2
+jHOi6ipUGlGymr1GHFBbMxph49/WgzQm7YvN2GKKGtxDczHGn4vZ7CGKebdseSD2
+5ooCMh9Z0Hm8KHYujd3lbFno4+UnpAhJfjQDaVDF0y0AEQEAAYkCNgQYAQoAIBYh
+BKUvXIO64mFgEg7CXz1WrPv7KXXhBQJfoFYfAhsMAAoJED1WrPv7KXXhPNYQAJwX
+VMqhWGtttKbVK+YZpQQ5zMZFNYA9sOXlvDZkqwTbpjo0tFKfJaG4QuXbPA8yIDSv
+uqqLlOJjMkWvDU7YdGqOKo/dnrl8WhCIDzptw0bcT+9rLWINr8e8SM5K6/4xnD3u
+DlXxn6m+Ioc/y0lf3likTtdwr5W5FTp3NbSBdZGdmM7dr9eg0eGb7GNPtlx/793n
+M5fy11HnwdPetrwQdX5715Vy6I6zX+cGAjHw9iu1wqoabx/1nzLNd/vrDi4UzSYk
+iai5govNS5ewavZPtKRY9xQL+43N0rl7CuIJL0kaiSl6zFn5BUHOVU5KXW16azoC
++MB2t702FmRQL0JdBmXblb2hK5Id443KHEgrLaJuzV9+I7M2aB/BH8I1qMo8r21l
+nuQVVY/MGNJV5cu9Db8KCQVnlrBxdlaHU6zIgR6hFV5R/YhCXyyNYPo6Nc9dBZFi
+iy/t/shEoHqMnYz2hqUWzqJS7d2q4RkJAf10Pao+P9ta8jA8GSvPlSx9OU7bte95
+xR0SwUbFgxg6/CLNp30V38kHtWcCqC8AKp8hubRLcBvgRVEzYGMXNvpzow1qesWy
+FJQwDThJvqlbNt6jXwiUKOP3Qdz+nfzW7hyKyNuPbgfsg7bIB/CYT3DWd9EB3si2
+aPUJvCHi+FxoZQUjh498B/JiK0D59rVNQZJOEjTy
+=5xjt
+-END PGP PUBLIC KEY BLOCK-




[beam] branch master updated (aa28f51 -> c3cf904)

2020-11-03 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from aa28f51  Merge pull request #13251: Add Dataflow Runner V2 Java 
ValidatesRunner streaming test configuration.
 add c3cf904  Implementing Python Bounded Source Reader DoFn (#13154)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/gcp/bigquery.py |   9 +-
 sdks/python/apache_beam/io/iobase.py   | 341 ++---
 sdks/python/apache_beam/io/iobase_test.py  |  58 ++---
 sdks/python/apache_beam/io/textio_test.py  |  14 +-
 4 files changed, 203 insertions(+), 219 deletions(-)



[beam] branch master updated: Adding display data to BQ File Loads transform

2020-11-04 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new eb58694  Adding display data to BQ File Loads transform
 new 7dcbdf1  Merge pull request #13249 from Adding display data to BQ File 
Loads transform
eb58694 is described below

commit eb586949dca35b984ccbefdb581fa4428f87fd45
Author: Pablo Estrada 
AuthorDate: Tue Nov 3 12:29:48 2020 -0800

Adding display data to BQ File Loads transform
---
 sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 09eb124..0342b4c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -447,10 +447,11 @@ class TriggerLoadJobs(beam.DoFn):
 result = {
 'create_disposition': str(self.create_disposition),
 'write_disposition': str(self.write_disposition),
+'additional_bq_params': str(self.additional_bq_parameters),
+'schema': str(self.schema),
+'launchesBigQueryJobs': DisplayDataItem(
+True, label="This Dataflow job launches bigquery jobs.")
 }
-result['schema'] = str(self.schema)
-result['launchesBigQueryJobs'] = DisplayDataItem(
-True, label="This Dataflow job launches bigquery jobs.")
 return result
 
   def start_bundle(self):



[beam] branch master updated: Updating BigQuery client for Python

2020-11-05 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e663017  Updating BigQuery client for Python
 new 7a3d723  Merge pull request #13201 from pabloem/bqupd - Updating 
BigQuery client for Python
e663017 is described below

commit e66301742eb7fdac35cc0ebc90a903cf57c597f5
Author: pabloem 
AuthorDate: Mon Oct 26 16:27:53 2020 -0700

Updating BigQuery client for Python
---
 .../clients/bigquery/bigquery_v2_client.py |  658 +
 .../clients/bigquery/bigquery_v2_messages.py   | 1483 +---
 2 files changed, 1664 insertions(+), 477 deletions(-)

diff --git 
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
 
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
index cde001a..74d71f1 100644
--- 
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
+++ 
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
@@ -30,17 +30,17 @@ class BigqueryV2(base_api.BaseApiClient):
   """Generated client library for service bigquery version v2."""
 
   MESSAGES_MODULE = messages
-  BASE_URL = u'https://bigquery.googleapis.com/bigquery/v2/'
-  MTLS_BASE_URL = u''
+  BASE_URL = 'https://bigquery.googleapis.com/bigquery/v2/'
+  MTLS_BASE_URL = 'https://www.mtls.googleapis.com/bigquery/v2/'
 
-  _PACKAGE = u'bigquery'
-  _SCOPES = [u'https://www.googleapis.com/auth/bigquery', 
u'https://www.googleapis.com/auth/bigquery.insertdata', 
u'https://www.googleapis.com/auth/bigquery.readonly', 
u'https://www.googleapis.com/auth/cloud-platform', 
u'https://www.googleapis.com/auth/cloud-platform.read-only', 
u'https://www.googleapis.com/auth/devstorage.full_control', 
u'https://www.googleapis.com/auth/devstorage.read_only', 
u'https://www.googleapis.com/auth/devstorage.read_write']
-  _VERSION = u'v2'
+  _PACKAGE = 'bigquery'
+  _SCOPES = ['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/bigquery.insertdata', 
'https://www.googleapis.com/auth/bigquery.readonly', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/cloud-platform.read-only', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/devstorage.read_only', 
'https://www.googleapis.com/auth/devstorage.read_write']
+  _VERSION = 'v2'
   _CLIENT_ID = '1042881264118.apps.googleusercontent.com'
   _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b'
   _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b'
-  _CLIENT_CLASS_NAME = u'BigqueryV2'
-  _URL_VERSION = u'v2'
+  _CLIENT_CLASS_NAME = 'BigqueryV2'
+  _URL_VERSION = 'v2'
   _API_KEY = None
 
   def __init__(self, url='', credentials=None,
@@ -63,13 +63,14 @@ class BigqueryV2(base_api.BaseApiClient):
 self.models = self.ModelsService(self)
 self.projects = self.ProjectsService(self)
 self.routines = self.RoutinesService(self)
+self.rowAccessPolicies = self.RowAccessPoliciesService(self)
 self.tabledata = self.TabledataService(self)
 self.tables = self.TablesService(self)
 
   class DatasetsService(base_api.BaseApiService):
 """Service class for the datasets resource."""
 
-_NAME = u'datasets'
+_NAME = 'datasets'
 
 def __init__(self, client):
   super(BigqueryV2.DatasetsService, self).__init__(client)
@@ -90,15 +91,15 @@ class BigqueryV2(base_api.BaseApiClient):
   config, request, global_params=global_params)
 
 Delete.method_config = lambda: base_api.ApiMethodInfo(
-http_method=u'DELETE',
-method_id=u'bigquery.datasets.delete',
-ordered_params=[u'projectId', u'datasetId'],
-path_params=[u'datasetId', u'projectId'],
-query_params=[u'deleteContents'],
-relative_path=u'projects/{projectId}/datasets/{datasetId}',
+http_method='DELETE',
+method_id='bigquery.datasets.delete',
+ordered_params=['projectId', 'datasetId'],
+path_params=['datasetId', 'projectId'],
+query_params=['deleteContents'],
+relative_path='projects/{projectId}/datasets/{datasetId}',
 request_field='',
-request_type_name=u'BigqueryDatasetsDeleteRequest',
-response_type_name=u'BigqueryDatasetsDe

[beam] branch master updated: Updating BigQuery client for Python

2020-11-05 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e663017  Updating BigQuery client for Python
 new 7a3d723  Merge pull request #13201 from pabloem/bqupd - Updating 
BigQuery client for Python
e663017 is described below

commit e66301742eb7fdac35cc0ebc90a903cf57c597f5
Author: pabloem 
AuthorDate: Mon Oct 26 16:27:53 2020 -0700

Updating BigQuery client for Python
---
 .../clients/bigquery/bigquery_v2_client.py |  658 +
 .../clients/bigquery/bigquery_v2_messages.py   | 1483 +---
 2 files changed, 1664 insertions(+), 477 deletions(-)

diff --git 
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
 
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
index cde001a..74d71f1 100644
--- 
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
+++ 
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
@@ -30,17 +30,17 @@ class BigqueryV2(base_api.BaseApiClient):
   """Generated client library for service bigquery version v2."""
 
   MESSAGES_MODULE = messages
-  BASE_URL = u'https://bigquery.googleapis.com/bigquery/v2/'
-  MTLS_BASE_URL = u''
+  BASE_URL = 'https://bigquery.googleapis.com/bigquery/v2/'
+  MTLS_BASE_URL = 'https://www.mtls.googleapis.com/bigquery/v2/'
 
-  _PACKAGE = u'bigquery'
-  _SCOPES = [u'https://www.googleapis.com/auth/bigquery', 
u'https://www.googleapis.com/auth/bigquery.insertdata', 
u'https://www.googleapis.com/auth/bigquery.readonly', 
u'https://www.googleapis.com/auth/cloud-platform', 
u'https://www.googleapis.com/auth/cloud-platform.read-only', 
u'https://www.googleapis.com/auth/devstorage.full_control', 
u'https://www.googleapis.com/auth/devstorage.read_only', 
u'https://www.googleapis.com/auth/devstorage.read_write']
-  _VERSION = u'v2'
+  _PACKAGE = 'bigquery'
+  _SCOPES = ['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/bigquery.insertdata', 
'https://www.googleapis.com/auth/bigquery.readonly', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/cloud-platform.read-only', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/devstorage.read_only', 
'https://www.googleapis.com/auth/devstorage.read_write']
+  _VERSION = 'v2'
   _CLIENT_ID = '1042881264118.apps.googleusercontent.com'
   _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b'
   _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b'
-  _CLIENT_CLASS_NAME = u'BigqueryV2'
-  _URL_VERSION = u'v2'
+  _CLIENT_CLASS_NAME = 'BigqueryV2'
+  _URL_VERSION = 'v2'
   _API_KEY = None
 
   def __init__(self, url='', credentials=None,
@@ -63,13 +63,14 @@ class BigqueryV2(base_api.BaseApiClient):
 self.models = self.ModelsService(self)
 self.projects = self.ProjectsService(self)
 self.routines = self.RoutinesService(self)
+self.rowAccessPolicies = self.RowAccessPoliciesService(self)
 self.tabledata = self.TabledataService(self)
 self.tables = self.TablesService(self)
 
   class DatasetsService(base_api.BaseApiService):
 """Service class for the datasets resource."""
 
-_NAME = u'datasets'
+_NAME = 'datasets'
 
 def __init__(self, client):
   super(BigqueryV2.DatasetsService, self).__init__(client)
@@ -90,15 +91,15 @@ class BigqueryV2(base_api.BaseApiClient):
   config, request, global_params=global_params)
 
 Delete.method_config = lambda: base_api.ApiMethodInfo(
-http_method=u'DELETE',
-method_id=u'bigquery.datasets.delete',
-ordered_params=[u'projectId', u'datasetId'],
-path_params=[u'datasetId', u'projectId'],
-query_params=[u'deleteContents'],
-relative_path=u'projects/{projectId}/datasets/{datasetId}',
+http_method='DELETE',
+method_id='bigquery.datasets.delete',
+ordered_params=['projectId', 'datasetId'],
+path_params=['datasetId', 'projectId'],
+query_params=['deleteContents'],
+relative_path='projects/{projectId}/datasets/{datasetId}',
 request_field='',
-request_type_name=u'BigqueryDatasetsDeleteRequest',
-response_type_name=u'BigqueryDatasetsDe

[beam] branch master updated (7a3d723 -> dfaf234)

2020-11-05 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 7a3d723  Merge pull request #13201 from pabloem/bqupd - Updating 
BigQuery client for Python
 add dfaf234  Merge pull request #13164 from Refactoring BigQuery Read 
utilities into internal file

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/gcp/bigquery.py |  88 --
 .../apache_beam/io/gcp/bigquery_read_internal.py   | 102 +
 sdks/python/apache_beam/io/gcp/bigquery_test.py|  19 ++--
 3 files changed, 127 insertions(+), 82 deletions(-)
 create mode 100644 sdks/python/apache_beam/io/gcp/bigquery_read_internal.py



[beam] branch master updated (7a3d723 -> dfaf234)

2020-11-05 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 7a3d723  Merge pull request #13201 from pabloem/bqupd - Updating 
BigQuery client for Python
 add dfaf234  Merge pull request #13164 from Refactoring BigQuery Read 
utilities into internal file

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/gcp/bigquery.py |  88 --
 .../apache_beam/io/gcp/bigquery_read_internal.py   | 102 +
 sdks/python/apache_beam/io/gcp/bigquery_test.py|  19 ++--
 3 files changed, 127 insertions(+), 82 deletions(-)
 create mode 100644 sdks/python/apache_beam/io/gcp/bigquery_read_internal.py



[beam] branch master updated: [BEAM-9804] Allow user configuration of BigQuery temporary dataset

2020-11-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 50bd126  [BEAM-9804] Allow user configuration of BigQuery temporary 
dataset
 new 7038af2  Merge pull request #12960 from [BEAM-9804] Allow user 
configuration of BigQuery temporary dataset
50bd126 is described below

commit 50bd1260475191321af56182787435a9c617066f
Author: Frank Zhao 
AuthorDate: Mon Sep 28 00:00:36 2020 +1000

[BEAM-9804] Allow user configuration of BigQuery temporary dataset

Allow ReadFromBigQuery to use a user pre-configured dataset for the 
temporary dataset.
Using a DatasetReference will also allow for cross project temporary 
dataset configuration.
---
 sdks/python/apache_beam/io/gcp/bigquery.py   | 18 --
 sdks/python/apache_beam/io/gcp/bigquery_test.py  | 45 
 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 23 +++-
 3 files changed, 75 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 694114e..e0866cf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -474,7 +474,8 @@ class _BigQuerySource(dataflow_io.NativeSource):
   coder=None,
   use_standard_sql=False,
   flatten_results=True,
-  kms_key=None):
+  kms_key=None,
+  temp_dataset=None):
 """Initialize a :class:`BigQuerySource`.
 
 Args:
@@ -513,6 +514,10 @@ class _BigQuerySource(dataflow_io.NativeSource):
 query results. The default value is :data:`True`.
   kms_key (str): Optional Cloud KMS key name for use when creating new
 tables.
+  temp_dataset (``google.cloud.bigquery.dataset.DatasetReference``):
+The dataset in which to create temporary tables when performing file
+loads. By default, a new dataset is created in the execution project 
for
+temporary tables.
 
 Raises:
   ValueError: if any of the following is true:
@@ -552,6 +557,7 @@ class _BigQuerySource(dataflow_io.NativeSource):
 self.flatten_results = flatten_results
 self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
 self.kms_key = kms_key
+self.temp_dataset = temp_dataset
 
   def display_data(self):
 if self.query is not None:
@@ -681,7 +687,8 @@ class _CustomBigQuerySource(BoundedSource):
   use_json_exports=False,
   job_name=None,
   step_name=None,
-  unique_id=None):
+  unique_id=None,
+  temp_dataset=None):
 if table is not None and query is not None:
   raise ValueError(
   'Both a BigQuery table and a query were specified.'
@@ -712,6 +719,7 @@ class _CustomBigQuerySource(BoundedSource):
 self.bq_io_metadata = None  # Populate in setup, as it may make an RPC
 self.bigquery_job_labels = bigquery_job_labels or {}
 self.use_json_exports = use_json_exports
+self.temp_dataset = temp_dataset
 self._job_name = job_name or 'AUTOMATIC_JOB_NAME'
 self._step_name = step_name
 self._source_uuid = unique_id
@@ -781,6 +789,8 @@ class _CustomBigQuerySource(BoundedSource):
 project = self.options.view_as(GoogleCloudOptions).project
 if isinstance(project, vp.ValueProvider):
   project = project.get()
+if self.temp_dataset:
+  return self.temp_dataset.projectId
 if not project:
   project = self.project
 return project
@@ -798,7 +808,9 @@ class _CustomBigQuerySource(BoundedSource):
 
   def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
 if self.split_result is None:
-  bq = bigquery_tools.BigQueryWrapper()
+  bq = bigquery_tools.BigQueryWrapper(
+  temp_dataset_id=(
+  self.temp_dataset.datasetId if self.temp_dataset else None))
 
   if self.query is not None:
 self._setup_temporary_dataset(bq)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 114f200..da3f34f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -51,6 +51,7 @@ from apache_beam.io.gcp.bigquery import _StreamToBigQuery
 from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
 from apache_beam.io.gcp.bigquery_read_internal import 
bigquery_export_destination_uri
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.bigquery_tools import RetryStrategy
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.pubsub import ReadFromPubSub
@@ -436,6 +437,50 @@ class TestReadFromBigQuery(unittest.TestCase):
 'empty, using temp_location instead'
  

[beam] branch master updated: [BEAM-9804] Allow user configuration of BigQuery temporary dataset

2020-11-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 50bd126  [BEAM-9804] Allow user configuration of BigQuery temporary 
dataset
 new 7038af2  Merge pull request #12960 from [BEAM-9804] Allow user 
configuration of BigQuery temporary dataset
50bd126 is described below

commit 50bd1260475191321af56182787435a9c617066f
Author: Frank Zhao 
AuthorDate: Mon Sep 28 00:00:36 2020 +1000

[BEAM-9804] Allow user configuration of BigQuery temporary dataset

Allow ReadFromBigQuery to use a user pre-configured dataset for the 
temporary dataset.
Using a DatasetReference will also allow for cross project temporary 
dataset configuration.
---
 sdks/python/apache_beam/io/gcp/bigquery.py   | 18 --
 sdks/python/apache_beam/io/gcp/bigquery_test.py  | 45 
 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 23 +++-
 3 files changed, 75 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 694114e..e0866cf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -474,7 +474,8 @@ class _BigQuerySource(dataflow_io.NativeSource):
   coder=None,
   use_standard_sql=False,
   flatten_results=True,
-  kms_key=None):
+  kms_key=None,
+  temp_dataset=None):
 """Initialize a :class:`BigQuerySource`.
 
 Args:
@@ -513,6 +514,10 @@ class _BigQuerySource(dataflow_io.NativeSource):
 query results. The default value is :data:`True`.
   kms_key (str): Optional Cloud KMS key name for use when creating new
 tables.
+  temp_dataset (``google.cloud.bigquery.dataset.DatasetReference``):
+The dataset in which to create temporary tables when performing file
+loads. By default, a new dataset is created in the execution project 
for
+temporary tables.
 
 Raises:
   ValueError: if any of the following is true:
@@ -552,6 +557,7 @@ class _BigQuerySource(dataflow_io.NativeSource):
 self.flatten_results = flatten_results
 self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
 self.kms_key = kms_key
+self.temp_dataset = temp_dataset
 
   def display_data(self):
 if self.query is not None:
@@ -681,7 +687,8 @@ class _CustomBigQuerySource(BoundedSource):
   use_json_exports=False,
   job_name=None,
   step_name=None,
-  unique_id=None):
+  unique_id=None,
+  temp_dataset=None):
 if table is not None and query is not None:
   raise ValueError(
   'Both a BigQuery table and a query were specified.'
@@ -712,6 +719,7 @@ class _CustomBigQuerySource(BoundedSource):
 self.bq_io_metadata = None  # Populate in setup, as it may make an RPC
 self.bigquery_job_labels = bigquery_job_labels or {}
 self.use_json_exports = use_json_exports
+self.temp_dataset = temp_dataset
 self._job_name = job_name or 'AUTOMATIC_JOB_NAME'
 self._step_name = step_name
 self._source_uuid = unique_id
@@ -781,6 +789,8 @@ class _CustomBigQuerySource(BoundedSource):
 project = self.options.view_as(GoogleCloudOptions).project
 if isinstance(project, vp.ValueProvider):
   project = project.get()
+if self.temp_dataset:
+  return self.temp_dataset.projectId
 if not project:
   project = self.project
 return project
@@ -798,7 +808,9 @@ class _CustomBigQuerySource(BoundedSource):
 
   def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
 if self.split_result is None:
-  bq = bigquery_tools.BigQueryWrapper()
+  bq = bigquery_tools.BigQueryWrapper(
+  temp_dataset_id=(
+  self.temp_dataset.datasetId if self.temp_dataset else None))
 
   if self.query is not None:
 self._setup_temporary_dataset(bq)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 114f200..da3f34f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -51,6 +51,7 @@ from apache_beam.io.gcp.bigquery import _StreamToBigQuery
 from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
 from apache_beam.io.gcp.bigquery_read_internal import 
bigquery_export_destination_uri
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.bigquery_tools import RetryStrategy
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.pubsub import ReadFromPubSub
@@ -436,6 +437,50 @@ class TestReadFromBigQuery(unittest.TestCase):
 'empty, using temp_location instead'
  

[beam] branch master updated: [BEAM-9804] Allow user configuration of BigQuery temporary dataset

2020-11-10 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 50bd126  [BEAM-9804] Allow user configuration of BigQuery temporary 
dataset
 new 7038af2  Merge pull request #12960 from [BEAM-9804] Allow user 
configuration of BigQuery temporary dataset
50bd126 is described below

commit 50bd1260475191321af56182787435a9c617066f
Author: Frank Zhao 
AuthorDate: Mon Sep 28 00:00:36 2020 +1000

[BEAM-9804] Allow user configuration of BigQuery temporary dataset

Allow ReadFromBigQuery to use a user pre-configured dataset for the 
temporary dataset.
Using a DatasetReference will also allow for cross project temporary 
dataset configuration.
---
 sdks/python/apache_beam/io/gcp/bigquery.py   | 18 --
 sdks/python/apache_beam/io/gcp/bigquery_test.py  | 45 
 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 23 +++-
 3 files changed, 75 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 694114e..e0866cf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -474,7 +474,8 @@ class _BigQuerySource(dataflow_io.NativeSource):
   coder=None,
   use_standard_sql=False,
   flatten_results=True,
-  kms_key=None):
+  kms_key=None,
+  temp_dataset=None):
 """Initialize a :class:`BigQuerySource`.
 
 Args:
@@ -513,6 +514,10 @@ class _BigQuerySource(dataflow_io.NativeSource):
 query results. The default value is :data:`True`.
   kms_key (str): Optional Cloud KMS key name for use when creating new
 tables.
+  temp_dataset (``google.cloud.bigquery.dataset.DatasetReference``):
+The dataset in which to create temporary tables when performing file
+loads. By default, a new dataset is created in the execution project 
for
+temporary tables.
 
 Raises:
   ValueError: if any of the following is true:
@@ -552,6 +557,7 @@ class _BigQuerySource(dataflow_io.NativeSource):
 self.flatten_results = flatten_results
 self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
 self.kms_key = kms_key
+self.temp_dataset = temp_dataset
 
   def display_data(self):
 if self.query is not None:
@@ -681,7 +687,8 @@ class _CustomBigQuerySource(BoundedSource):
   use_json_exports=False,
   job_name=None,
   step_name=None,
-  unique_id=None):
+  unique_id=None,
+  temp_dataset=None):
 if table is not None and query is not None:
   raise ValueError(
   'Both a BigQuery table and a query were specified.'
@@ -712,6 +719,7 @@ class _CustomBigQuerySource(BoundedSource):
 self.bq_io_metadata = None  # Populate in setup, as it may make an RPC
 self.bigquery_job_labels = bigquery_job_labels or {}
 self.use_json_exports = use_json_exports
+self.temp_dataset = temp_dataset
 self._job_name = job_name or 'AUTOMATIC_JOB_NAME'
 self._step_name = step_name
 self._source_uuid = unique_id
@@ -781,6 +789,8 @@ class _CustomBigQuerySource(BoundedSource):
 project = self.options.view_as(GoogleCloudOptions).project
 if isinstance(project, vp.ValueProvider):
   project = project.get()
+if self.temp_dataset:
+  return self.temp_dataset.projectId
 if not project:
   project = self.project
 return project
@@ -798,7 +808,9 @@ class _CustomBigQuerySource(BoundedSource):
 
   def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
 if self.split_result is None:
-  bq = bigquery_tools.BigQueryWrapper()
+  bq = bigquery_tools.BigQueryWrapper(
+  temp_dataset_id=(
+  self.temp_dataset.datasetId if self.temp_dataset else None))
 
   if self.query is not None:
 self._setup_temporary_dataset(bq)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 114f200..da3f34f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -51,6 +51,7 @@ from apache_beam.io.gcp.bigquery import _StreamToBigQuery
 from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
 from apache_beam.io.gcp.bigquery_read_internal import 
bigquery_export_destination_uri
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.bigquery_tools import RetryStrategy
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.pubsub import ReadFromPubSub
@@ -436,6 +437,50 @@ class TestReadFromBigQuery(unittest.TestCase):
 'empty, using temp_location instead'
  

[beam] branch master updated (06fc5b03 -> 816017e)

2020-11-12 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 06fc5b03 [BEAM-10114] Pub/Sub Lite getSplitBacklog implementation 
(#12867)
 add d48b0e3  Add 2.27.0 section
 add 816017e  Merge pull request #13320 from apache/lostluck-patch-1 - Add 
2.27.0 section to CHANGES.md

No new revisions were added by this update.

Summary of changes:
 CHANGES.md | 32 ++--
 1 file changed, 30 insertions(+), 2 deletions(-)



[beam] branch master updated (06fc5b03 -> 816017e)

2020-11-12 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 06fc5b03 [BEAM-10114] Pub/Sub Lite getSplitBacklog implementation 
(#12867)
 add d48b0e3  Add 2.27.0 section
 add 816017e  Merge pull request #13320 from apache/lostluck-patch-1 - Add 
2.27.0 section to CHANGES.md

No new revisions were added by this update.

Summary of changes:
 CHANGES.md | 32 ++--
 1 file changed, 30 insertions(+), 2 deletions(-)



[beam] branch master updated: [BEAM-11255] Adding upper bound on urllib3 dependency.

2020-11-12 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 92b7025  [BEAM-11255] Adding upper bound on urllib3 dependency.
 new a041611  Merge pull request #13321 from [BEAM-11255] Adding upper 
bound on urllib3 dependency
92b7025 is described below

commit 92b7025386382c835863774ff036e171f94bb9c4
Author: Pablo Estrada 
AuthorDate: Thu Nov 12 11:29:12 2020 -0800

[BEAM-11255] Adding upper bound on urllib3 dependency.
---
 sdks/python/setup.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index b2ffb96..ac79b86 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -153,6 +153,8 @@ REQUIRED_PACKAGES = [
 'pytz>=2018.3',
 'requests>=2.24.0,<3.0.0',
 'typing-extensions>=3.7.0,<3.8.0',
+# TODO(BEAM-11255): urllib3 upper bound added to fix incompatibility.
+'urllib3<1.26',
 ]
 
 # [BEAM-8181] pyarrow cannot be installed on 32-bit Windows platforms.



[beam] branch master updated (b2f550c -> 382cf37)

2020-11-12 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from b2f550c  [BEAM-11256] Get writer method from dataframe at execution 
time (#13322)
 add 382cf37  Add display data to BoundedSourceDoFn (#13310)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/iobase.py | 16 +---
 1 file changed, 9 insertions(+), 7 deletions(-)



[beam] branch master updated (17beb66 -> b0dd257)

2020-11-13 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 17beb66  Merge pull request #13282: [BEAM-11172] Enable KafkaIO 
performance test for Dataflow runner v2 with SDF.
 add b0dd257  Squash, merge PR #13277 - [BEAM-10921] Add the 
UserPipelineTracker to track user pipelines for derived pipelines

No new revisions were added by this update.

Summary of changes:
 .../runners/interactive/user_pipeline_tracker.py   | 135 +++
 .../interactive/user_pipeline_tracker_test.py  | 183 +
 2 files changed, 318 insertions(+)
 create mode 100644 
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py
 create mode 100644 
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker_test.py



[beam] branch master updated: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics

2020-11-18 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 53a26ae  [BEAM-11092] Add bigquery io request count metric, 
implementing HarnessMonitoringInfos and process_wide metrics
 new 7c28db2  Merge pull request #13217 from [BEAM-11092] Add bigquery io 
request count metric, implementing HarnessMonitoringInfos and process_wide 
metrics
53a26ae is described below

commit 53a26aee3f35b6dabb62661a7186217fc5c4442c
Author: Alex Amato 
AuthorDate: Tue Nov 17 11:00:16 2020 -0800

[BEAM-11092] Add bigquery io request count metric, implementing 
HarnessMonitoringInfos and process_wide metrics
---
 sdks/python/apache_beam/internal/metrics/metric.py | 90 ++
 .../apache_beam/internal/metrics/metric_test.py| 41 ++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 36 -
 .../gcp/resource_identifiers.py}   | 34 +++-
 sdks/python/apache_beam/metrics/cells.py   | 22 --
 sdks/python/apache_beam/metrics/execution.pxd  |  4 +-
 sdks/python/apache_beam/metrics/execution.py   | 71 +++--
 sdks/python/apache_beam/metrics/metric.py  | 18 +++--
 sdks/python/apache_beam/metrics/metricbase.py  | 42 +++---
 .../python/apache_beam/metrics/monitoring_infos.py | 21 -
 .../apache_beam/metrics/monitoring_infos_test.py   | 35 +
 .../apache_beam/runners/worker/sdk_worker.py   | 38 ++---
 .../apache_beam/runners/worker/sdk_worker_test.py  | 60 +++
 13 files changed, 426 insertions(+), 86 deletions(-)

diff --git a/sdks/python/apache_beam/internal/metrics/metric.py 
b/sdks/python/apache_beam/internal/metrics/metric.py
index 2fbb963..069919e 100644
--- a/sdks/python/apache_beam/internal/metrics/metric.py
+++ b/sdks/python/apache_beam/internal/metrics/metric.py
@@ -39,6 +39,7 @@ from typing import Type
 from typing import Union
 
 from apache_beam.internal.metrics.cells import HistogramCellFactory
+from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.execution import MetricUpdater
 from apache_beam.metrics.metric import Metrics as UserMetrics
 from apache_beam.metrics.metricbase import Histogram
@@ -49,6 +50,13 @@ if TYPE_CHECKING:
   from apache_beam.metrics.cells import MetricCellFactory
   from apache_beam.utils.histogram import BucketType
 
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  pass
+
 __all__ = ['Metrics']
 
 _LOGGER = logging.getLogger(__name__)
@@ -56,6 +64,27 @@ _LOGGER = logging.getLogger(__name__)
 
 class Metrics(object):
   @staticmethod
+  def counter(urn, labels=None, process_wide=False):
+# type: (str, Optional[Dict[str, str]], bool) -> 
UserMetrics.DelegatingCounter
+
+"""Obtains or creates a Counter metric.
+
+Args:
+  namespace: A class or string that gives the namespace to a metric
+  name: A string that gives a unique name to a metric
+  urn: URN to populate on a MonitoringInfo, when sending to RunnerHarness.
+  labels: Labels to populate on a MonitoringInfo
+  process_wide: Whether or not the metric is specific to the current bundle
+  or should be calculated for the entire process.
+
+Returns:
+  A Counter object.
+"""
+return UserMetrics.DelegatingCounter(
+MetricName(namespace=None, name=None, urn=urn, labels=labels),
+process_wide=process_wide)
+
+  @staticmethod
   def histogram(namespace, name, bucket_type, logger=None):
 # type: (Union[Type, str], str, BucketType, Optional[MetricLogger]) -> 
Metrics.DelegatingHistogram
 
@@ -136,3 +165,64 @@ class MetricLogger(object):
   self._last_logging_millis = current_millis
   finally:
 self._lock.release()
+
+
+class ServiceCallMetric(object):
+  """Metric class which records Service API call metrics.
+
+  This class will capture a request count metric for the specified
+  request_count_urn and base_labels.
+
+  When call() is invoked the status must be provided, which will
+  be converted to a canonical GCP status code, if possible.
+
+  TODO(ajamato): Add Request latency metric.
+  """
+  def __init__(self, request_count_urn, base_labels=None):
+# type: (str, Optional[Dict[str, str]]) -> None
+self.base_labels = base_labels if base_labels else {}
+self.request_count_urn = request_count_urn
+
+  def call(self, status):
+# type: (Union[int, str, HttpError]) -> None
+
+"""Record the status of the call into appropriate metrics."""
+canonical_status = self.convert_to_canonical_status_string(status

[beam] branch master updated: Restore original str() behaviour to MetricName, for user counter(namespace+name) style MetricNames

2020-11-20 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 60e0221  Restore original str() behaviour to MetricName, for user 
counter(namespace+name) style MetricNames
 new 6851109  Merge pull request #13400 from Restore original str() 
behaviour to MetricName, for user counter(namespace+name) style MetricNames
60e0221 is described below

commit 60e02218b41510686ccd828f09db779d52041c48
Author: Alex Amato 
AuthorDate: Fri Nov 20 14:41:45 2020 -0800

Restore original str() behaviour to MetricName, for user 
counter(namespace+name) style MetricNames
---
 sdks/python/apache_beam/metrics/metric_test.py | 16 
 sdks/python/apache_beam/metrics/metricbase.py  |  8 ++--
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/metrics/metric_test.py 
b/sdks/python/apache_beam/metrics/metric_test.py
index 09f1902..a0f75ed 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -137,6 +137,22 @@ class MetricsTest(unittest.TestCase):
 with self.assertRaises(ValueError):
   Metrics.distribution("", "names")
 
+  # Do not change the behaviour of str(), do tno update/delete this test case
+  # if the behaviour of str() is changed. Doing so will
+  # break end user beam code which depends on the str() behaviour.
+  def test_user_metric_name_str(self):
+mn = MetricName("my_namespace", "my_name")
+expected_str = 'MetricName(namespace=my_namespace, name=my_name)'
+self.assertEqual(str(mn), expected_str)
+
+  def test_general_urn_metric_name_str(self):
+mn = MetricName(
+"my_namespace", "my_name", urn='my_urn', labels={'key': 'value'})
+expected_str = (
+"MetricName(namespace=my_namespace, name=my_name, "
+"urn=my_urn, labels={'key': 'value'})")
+self.assertEqual(str(mn), expected_str)
+
   @attr('ValidatesRunner')
   def test_user_counter_using_pardo(self):
 class SomeDoFn(beam.DoFn):
diff --git a/sdks/python/apache_beam/metrics/metricbase.py 
b/sdks/python/apache_beam/metrics/metricbase.py
index f29641e..b89b4bb 100644
--- a/sdks/python/apache_beam/metrics/metricbase.py
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -87,8 +87,12 @@ class MetricName(object):
 return not self == other
 
   def __str__(self):
-return 'MetricName(namespace={}, name={}, urn={}, labels={})'.format(
-self.namespace, self.name, self.urn, self.labels)
+if self.urn:
+  return 'MetricName(namespace={}, name={}, urn={}, labels={})'.format(
+  self.namespace, self.name, self.urn, self.labels)
+else:  # User counter case.
+  return 'MetricName(namespace={}, name={})'.format(
+  self.namespace, self.name)
 
   def __hash__(self):
 return hash((self.namespace, self.name, self.urn) +



[beam] branch master updated: Warn if temp dataset cleanup permission is denied

2020-11-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d23f47  Warn if temp dataset cleanup permission is denied
 new bee6602  Merge pull request #13433 from Warn if temp dataset cleanup 
permission is denied
3d23f47 is described below

commit 3d23f471c2d98ad06e720eb385abef532b7d4be7
Author: Frank Zhao 
AuthorDate: Thu Nov 26 14:01:00 2020 +1100

Warn if temp dataset cleanup permission is denied

This will mean users no longer require BigQuery dataset delete
permission for the execution user.
---
 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 12 +++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index da1b7fd..1544588 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -783,7 +783,17 @@ class BigQueryWrapper(object):
 return
   else:
 raise
-self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
+try:
+  self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
+except HttpError as exn:
+  if exn.status_code == 403:
+_LOGGER.warning(
+'Permission denied to delete temporary dataset %s:%s for clean up',
+temp_table.projectId,
+temp_table.datasetId)
+return
+  else:
+raise
 
   @retry.with_exponential_backoff(
   num_retries=MAX_RETRIES,



[beam] branch master updated (bee6602 -> a1fac1d)

2020-11-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from bee6602  Merge pull request #13433 from Warn if temp dataset cleanup 
permission is denied
 add a1fac1d  Merge pull request #13170 from [BEAM-9650] Adding support for 
ReadAll from BigQuery transform

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |   1 +
 sdks/python/apache_beam/io/gcp/bigquery.py | 268 +--
 .../apache_beam/io/gcp/bigquery_read_internal.py   | 289 +
 .../apache_beam/io/gcp/bigquery_read_it_test.py| 103 
 sdks/python/apache_beam/io/gcp/bigquery_test.py|   2 +-
 sdks/python/apache_beam/io/iobase.py   |  13 +-
 6 files changed, 588 insertions(+), 88 deletions(-)



[beam] branch master updated (a1fac1d -> 978b812)

2020-11-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from a1fac1d  Merge pull request #13170 from [BEAM-9650] Adding support for 
ReadAll from BigQuery transform
 add 978b812  Merge pull request #13137 from [BEAM-11073] Dicom IO 
Connector for Java

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/io/gcp/healthcare/DicomIO.java | 199 +
 .../sdk/io/gcp/healthcare/HealthcareApiClient.java |  12 ++
 .../io/gcp/healthcare/HttpHealthcareApiClient.java |  79 
 .../beam/sdk/io/gcp/healthcare/WebPathParser.java  |  63 +++
 .../beam/sdk/io/gcp/healthcare/DicomIOReadIT.java  |  91 ++
 .../beam/sdk/io/gcp/healthcare/DicomIOTest.java}   |  44 +++--
 .../sdk/io/gcp/healthcare/WebPathParserTest.java   |  41 ++---
 .../src/test/resources/DICOM/testDicomFile.dcm | Bin 0 -> 10366 bytes
 8 files changed, 494 insertions(+), 35 deletions(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/WebPathParser.java
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOReadIT.java
 copy 
sdks/java/{core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java => 
io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOTest.java}
 (52%)
 copy 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/apiary/ApiaryTest.java
 => 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/WebPathParserTest.java
 (50%)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/resources/DICOM/testDicomFile.dcm



[beam] branch master updated (978b812 -> 40f517f)

2020-11-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 978b812  Merge pull request #13137 from [BEAM-11073] Dicom IO 
Connector for Java
 add 40f517f  Fixes silent fail in bigtableio.py by logging ImportError 
(#13332)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/gcp/bigtableio.py | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)



[beam] branch master updated (02da3ae -> 87f3138)

2020-12-01 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 02da3ae  Update python versions in pre-requisites (#13451)
 add 87f3138  Merge pull request #12779 from [BEAM-10856] Support for 
NestedValueProvider for Python SDK

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |  1 +
 sdks/python/apache_beam/options/value_provider.py  | 71 +-
 .../apache_beam/options/value_provider_test.py | 40 
 3 files changed, 109 insertions(+), 3 deletions(-)



[beam] branch master updated: Add start_times to MonitoringInfos and populate them in the python SDK

2020-12-02 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c25bc0  Add start_times to MonitoringInfos and populate them in the 
python SDK
 new eca935b  Merge pull request #13429 from Add start_times to 
MonitoringInfos and populate them in the python SDK
6c25bc0 is described below

commit 6c25bc082e809246989ca8a459c36c5b4d247f20
Author: Alex Amato 
AuthorDate: Wed Nov 25 12:23:45 2020 -0800

Add start_times to MonitoringInfos and populate them in the python SDK
---
 model/pipeline/src/main/proto/metrics.proto| 13 +++
 sdks/python/apache_beam/metrics/cells.pxd  |  2 ++
 sdks/python/apache_beam/metrics/cells.py   | 15 ++---
 sdks/python/apache_beam/metrics/cells_test.py  | 25 ++
 .../apache_beam/runners/worker/sdk_worker_test.py  |  2 ++
 5 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/model/pipeline/src/main/proto/metrics.proto 
b/model/pipeline/src/main/proto/metrics.proto
index 86114a8..39ef551 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -31,6 +31,7 @@ option java_outer_classname = "MetricsApi";
 
 import "beam_runner_api.proto";
 import "google/protobuf/descriptor.proto";
+import "google/protobuf/timestamp.proto";
 
 // A specification for describing a well known MonitoringInfo.
 //
@@ -401,6 +402,18 @@ message MonitoringInfo {
   // as Stackdriver will be able to aggregate the metrics using a subset of the
   // provided labels
   map labels = 4;
+
+  // This indicates the start of the time range over which this value was
+  // measured.
+  // This is needed by some external metric aggregation services
+  // to indicate when the reporter of the metric first began collecting the
+  // cumulative value for the timeseries.
+  // If the SDK Harness restarts, it should reset the start_time, and reset
+  // the collection of cumulative metrics (i.e. start to count again from 0).
+  // HarnessMonitoringInfos should set this start_time once, when the
+  // MonitoringInfo is first reported.
+  // ProcessBundle MonitoringInfos should set a start_time for each bundle.
+  google.protobuf.Timestamp start_time = 5;
 }
 
 // A set of well known URNs that specify the encoding and aggregation method.
diff --git a/sdks/python/apache_beam/metrics/cells.pxd 
b/sdks/python/apache_beam/metrics/cells.pxd
index 0204da8..0eaa890 100644
--- a/sdks/python/apache_beam/metrics/cells.pxd
+++ b/sdks/python/apache_beam/metrics/cells.pxd
@@ -17,11 +17,13 @@
 
 cimport cython
 cimport libc.stdint
+from cpython.datetime cimport datetime
 
 
 cdef class MetricCell(object):
   cdef object _lock
   cpdef bint update(self, value) except -1
+  cdef datetime _start_time
 
 
 cdef class CounterCell(MetricCell):
diff --git a/sdks/python/apache_beam/metrics/cells.py 
b/sdks/python/apache_beam/metrics/cells.py
index a7b7938..34ce2a4 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -29,6 +29,7 @@ from __future__ import division
 import threading
 import time
 from builtins import object
+from datetime import datetime
 from typing import Any
 from typing import Optional
 from typing import SupportsInt
@@ -63,6 +64,7 @@ class MetricCell(object):
   """
   def __init__(self):
 self._lock = threading.Lock()
+self._start_time = None
 
   def update(self, value):
 raise NotImplementedError
@@ -71,6 +73,13 @@ class MetricCell(object):
 raise NotImplementedError
 
   def to_runner_api_monitoring_info(self, name, transform_id):
+if not self._start_time:
+  self._start_time = datetime.utcnow()
+mi = self.to_runner_api_monitoring_info_impl(name, transform_id)
+mi.start_time.FromDatetime(self._start_time)
+return mi
+
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
 raise NotImplementedError
 
   def reset(self):
@@ -136,7 +145,7 @@ class CounterCell(MetricCell):
 with self._lock:
   return self.value
 
-  def to_runner_api_monitoring_info(self, name, transform_id):
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
 from apache_beam.metrics import monitoring_infos
 if not name.urn:
   # User counter case.
@@ -201,7 +210,7 @@ class DistributionCell(MetricCell):
 with self._lock:
   return self.data.get_cumulative()
 
-  def to_runner_api_monitoring_info(self, name, transform_id):
+  def to_runner_api_monitoring_info_impl(self, name, transform_id):
 from apache_beam.metrics import monitoring_infos
 return monitoring_infos.int64_user_distribution(
 name.namespace,
@@ -251,7 +260,7 @@ class GaugeCell(MetricCell):
 with self._lock:
   return self.data.get_cumulative()
 
-  def to_runne

[beam] branch master updated (ecced96 -> cb9212c)

2020-12-04 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ecced96  Merge pull request #13383 from mattcasters/master
 new fa0ddc6  [BEAM-11386] Fail BQ STREAMING_INSERTS with 
SchemaUpdateOptions
 new dd0cbec  Fix condition and test
 new cb9212c  Merge pull request #13468 from [BEAM-11386] Fail BQ 
STREAMING_INSERTS with SchemaUpdateOptions

The 29854 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 +
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 9 +
 2 files changed, 14 insertions(+)



[beam] branch master updated (2462fe9 -> 553553d)

2020-12-07 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 2462fe9  Merge pull request #13199 from [BEAM-11132] Remove 
Experimental annotation from SDF related APIs.
 add 553553d  Avoiding queries that check whether a BQ table is empty for 
streaming inserts (#13490)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/gcp/bigquery.py   |  5 +
 sdks/python/apache_beam/io/gcp/bigquery_tools.py | 14 +++---
 2 files changed, 12 insertions(+), 7 deletions(-)



[beam] branch master updated (bbaaeb0 -> 5caeb5d)

2020-12-08 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from bbaaeb0  Decode data channel elements after, rather than before, 
placing them in the queue. (#13387)
 add 5caeb5d  Merge pull request #13395 from [BEAM-11426] Add FHIR Search 
to io/gcp/healthcare/FhirIO

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java  | 232 +
 .../sdk/io/gcp/healthcare/HealthcareApiClient.java |  17 ++
 .../io/gcp/healthcare/HttpHealthcareApiClient.java | 174 
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |   3 +-
 .../beam/sdk/io/gcp/healthcare/FhirIOSearchIT.java | 155 ++
 .../beam/sdk/io/gcp/healthcare/FhirIOTest.java |  20 ++
 6 files changed, 600 insertions(+), 1 deletion(-)
 create mode 100644 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOSearchIT.java



svn commit: r44937 - in /release/beam: ./ 2.26.0/ 2.26.0/python/

2020-12-11 Thread pabloem
Author: pabloem
Date: Fri Dec 11 21:56:55 2020
New Revision: 44937

Log:
Release of Apache Beam 2.26.0 


Added:
release/beam/2.26.0/
release/beam/2.26.0/apache-beam-2.26.0-source-release.zip   (with props)
release/beam/2.26.0/apache-beam-2.26.0-source-release.zip.asc
release/beam/2.26.0/apache-beam-2.26.0-source-release.zip.sha512
release/beam/2.26.0/python/
release/beam/2.26.0/python/apache-beam-2.26.0.zip   (with props)
release/beam/2.26.0/python/apache-beam-2.26.0.zip.asc
release/beam/2.26.0/python/apache-beam-2.26.0.zip.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-macosx_10_9_x86_64.whl 
  (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-macosx_10_9_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-macosx_10_9_x86_64.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux1_i686.whl   
(with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux1_i686.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux1_i686.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux1_x86_64.whl  
 (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux1_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux1_x86_64.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux2010_i686.whl 
  (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux2010_i686.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux2010_i686.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux2010_x86_64.whl
   (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux2010_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-manylinux2010_x86_64.whl.sha512
release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-win32.whl   (with 
props)
release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-win32.whl.asc
release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-win32.whl.sha512
release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-win_amd64.whl   
(with props)
release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-win_amd64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp36-cp36m-win_amd64.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-macosx_10_9_x86_64.whl 
  (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-macosx_10_9_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-macosx_10_9_x86_64.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux1_i686.whl   
(with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux1_i686.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux1_i686.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux1_x86_64.whl  
 (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux1_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux1_x86_64.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux2010_i686.whl 
  (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux2010_i686.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux2010_i686.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux2010_x86_64.whl
   (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux2010_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-manylinux2010_x86_64.whl.sha512
release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-win32.whl   (with 
props)
release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-win32.whl.asc
release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-win32.whl.sha512
release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-win_amd64.whl   
(with props)
release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-win_amd64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp37-cp37m-win_amd64.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-macosx_10_9_x86_64.whl  
 (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-macosx_10_9_x86_64.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-macosx_10_9_x86_64.whl.sha512
release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-manylinux1_i686.whl 
  (with props)

release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-manylinux1_i686.whl.asc

release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-manylinux1_i686.whl.sha512

release/beam/2.26.0/python/apache_beam-2.26.0-cp38-cp38-manylinux1_x86_64.whl

  1   2   3   4   5   6   7   8   9   10   >