This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.56.0 by this push:
     new 3c7a01360df Cherry picking (#30460) BQ clustering valueprovider  
(#31039)
3c7a01360df is described below

commit 3c7a01360df3635ae56b7073dfa36e75e7492f61
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Thu Apr 18 11:47:41 2024 -0400

    Cherry picking (#30460) BQ clustering valueprovider  (#31039)
    
    * Support clustering with value provider
    
    * remove
    
    * add some documentation
    
    * fix
    
    * address comments
    
    * update test
    
    * spotless
    
    * use serializable json clustering; fix translation
    
    * fall back on super's clustering and time partitioning when needed
    
    * fork based on version 2.56.0
    
    * fix test
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 22 +++++++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 31 ++++++++++++++--------
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 ++++++++-----
 .../gcp/bigquery/DynamicDestinationsHelpers.java   |  8 ++++++
 .../sdk/io/gcp/bigquery/BigQueryClusteringIT.java  |  5 +++-
 .../sdk/io/gcp/bigquery/BigQueryHelpersTest.java   | 11 ++++++++
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +++++--
 .../BigQueryTimePartitioningClusteringIT.java      |  1 +
 8 files changed, 85 insertions(+), 21 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index c4ad09ce6ea..8c600cf780a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobReference;
@@ -31,6 +33,8 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
@@ -40,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
@@ -704,6 +709,23 @@ public class BigQueryHelpers {
     }
   }
 
+  static Clustering clusteringFromJsonFields(String jsonStringClustering) {
+    JsonElement jsonClustering = JsonParser.parseString(jsonStringClustering);
+
+    checkArgument(
+        jsonClustering.isJsonArray(),
+        "Received an invalid Clustering json string: %s."
+            + "Please provide a serialized json array like so: [\"column1\", 
\"column2\"]",
+        jsonStringClustering);
+
+    List<String> fields =
+        jsonClustering.getAsJsonArray().asList().stream()
+            .map(JsonElement::getAsString)
+            .collect(Collectors.toList());
+
+    return new Clustering().setFields(fields);
+  }
+
   static String resolveTempLocation(
       String tempLocationDir, String bigQueryOperationName, String stepUuid) {
     return FileSystems.matchNewResource(tempLocationDir, true)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a646e1e6247..fce8f1c5d40 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2350,7 +2350,7 @@ public class BigQueryIO {
 
     abstract @Nullable ValueProvider<String> getJsonTimePartitioning();
 
-    abstract @Nullable Clustering getClustering();
+    abstract @Nullable ValueProvider<String> getJsonClustering();
 
     abstract CreateDisposition getCreateDisposition();
 
@@ -2459,7 +2459,7 @@ public class BigQueryIO {
 
       abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> 
jsonTimePartitioning);
 
-      abstract Builder<T> setClustering(Clustering clustering);
+      abstract Builder<T> setJsonClustering(ValueProvider<String> clustering);
 
       abstract Builder<T> setCreateDisposition(CreateDisposition 
createDisposition);
 
@@ -2826,8 +2826,18 @@ public class BigQueryIO {
      * tables, the fields here will be ignored; call {@link #withClustering()} 
instead.
      */
     public Write<T> withClustering(Clustering clustering) {
-      checkArgument(clustering != null, "clustering can not be null");
-      return toBuilder().setClustering(clustering).build();
+      checkArgument(clustering != null, "clustering cannot be null");
+      return 
withJsonClustering(StaticValueProvider.of(BigQueryHelpers.toJsonString(clustering)));
+    }
+
+    /**
+     * The same as {@link #withClustering(Clustering)}, but takes a 
JSON-serialized Clustering
+     * object in a deferred {@link ValueProvider}. For example: `"{"fields": 
["column1", "column2",
+     * "column3"]}"`
+     */
+    public Write<T> withJsonClustering(ValueProvider<String> jsonClustering) {
+      checkArgument(jsonClustering != null, "clustering cannot be null");
+      return toBuilder().setJsonClustering(jsonClustering).build();
     }
 
     /**
@@ -2844,7 +2854,7 @@ public class BigQueryIO {
      * read state written with a previous version.
      */
     public Write<T> withClustering() {
-      return toBuilder().setClustering(new Clustering()).build();
+      return withClustering(new Clustering());
     }
 
     /** Specifies whether the table should be created if it does not exist. */
@@ -3420,10 +3430,10 @@ public class BigQueryIO {
         if (getJsonTableRef() != null) {
           dynamicDestinations =
               
DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
-                  getJsonTableRef(), getTableDescription(), getClustering() != 
null);
+                  getJsonTableRef(), getTableDescription(), 
getJsonClustering() != null);
         } else if (getTableFunction() != null) {
           dynamicDestinations =
-              new TableFunctionDestinations<>(getTableFunction(), 
getClustering() != null);
+              new TableFunctionDestinations<>(getTableFunction(), 
getJsonClustering() != null);
         }
 
         // Wrap with a DynamicDestinations class that will provide a schema. 
There might be no
@@ -3440,13 +3450,12 @@ public class BigQueryIO {
         }
 
         // Wrap with a DynamicDestinations class that will provide the proper 
TimePartitioning.
-        if (getJsonTimePartitioning() != null
-            || 
Optional.ofNullable(getClustering()).map(Clustering::getFields).isPresent()) {
+        if (getJsonTimePartitioning() != null || (getJsonClustering() != 
null)) {
           dynamicDestinations =
               new ConstantTimePartitioningClusteringDestinations<>(
                   (DynamicDestinations<T, TableDestination>) 
dynamicDestinations,
                   getJsonTimePartitioning(),
-                  
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
+                  getJsonClustering());
         }
         if (getPrimaryKey() != null) {
           dynamicDestinations =
@@ -3699,7 +3708,7 @@ public class BigQueryIO {
                 elementCoder,
                 rowWriterFactory,
                 getKmsKey(),
-                getClustering() != null,
+                getJsonClustering() != null,
                 getUseAvroLogicalTypes(),
                 getWriteTempDataset(),
                 getBadRecordRouter(),
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index 9b499ea3253..fee79f5896c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static 
org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
 import static 
org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;
 
-import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.auto.service.AutoService;
 import 
com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
@@ -373,7 +372,7 @@ public class BigQueryIOTranslation {
             .addNullableByteArrayField("dynamic_destinations")
             .addNullableStringField("json_schema")
             .addNullableStringField("json_time_partitioning")
-            .addNullableByteArrayField("clustering")
+            .addNullableStringField("clustering")
             .addNullableByteArrayField("create_disposition")
             .addNullableByteArrayField("write_disposition")
             .addNullableArrayField("schema_update_options", FieldType.BYTES)
@@ -474,8 +473,8 @@ public class BigQueryIOTranslation {
         fieldValues.put(
             "json_time_partitioning", 
toByteArray(transform.getJsonTimePartitioning().get()));
       }
-      if (transform.getClustering() != null) {
-        fieldValues.put("clustering", toByteArray(transform.getClustering()));
+      if (transform.getJsonClustering() != null) {
+        fieldValues.put("clustering", transform.getJsonClustering().get());
       }
       if (transform.getCreateDisposition() != null) {
         fieldValues.put("create_disposition", 
toByteArray(transform.getCreateDisposition()));
@@ -658,9 +657,14 @@ public class BigQueryIOTranslation {
         if (jsonTimePartitioning != null) {
           builder = 
builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
         }
-        byte[] clusteringBytes = configRow.getBytes("clustering");
-        if (clusteringBytes != null) {
-          builder = builder.setClustering((Clustering) 
fromByteArray(clusteringBytes));
+        // Translation with Clustering is broken before 2.56.0, where we used 
to attempt to
+        // serialize a non-serializable Clustering object to bytes.
+        // In 2.56.0 onwards, we translate using the json string 
representation instead.
+        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, 
"2.56.0") >= 0) {
+          String jsonClustering = configRow.getString("clustering");
+          if (jsonClustering != null) {
+            builder = 
builder.setJsonClustering(StaticValueProvider.of(jsonClustering));
+          }
         }
         byte[] createDispositionBytes = 
configRow.getBytes("create_disposition");
         if (createDispositionBytes != null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 1f042a81eb9..eed4314e391 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.gson.JsonParser;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -303,7 +304,14 @@ class DynamicDestinationsHelpers {
       TableDestination destination = super.getDestination(element);
       String partitioning =
           
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
+      if (partitioning == null
+          || JsonParser.parseString(partitioning).getAsJsonObject().isEmpty()) 
{
+        partitioning = destination.getJsonTimePartitioning();
+      }
       String clustering = 
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);
+      if (clustering == null || 
JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) {
+        clustering = destination.getJsonClustering();
+      }
 
       return new TableDestination(
           destination.getTableSpec(), destination.getTableDescription(), 
partitioning, clustering);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
index 3ee6931dd98..46989021ee9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -171,7 +172,9 @@ public class BigQueryClusteringIT {
         .apply(
             BigQueryIO.writeTableRows()
                 .to(new ClusteredDestinations(tableName))
-                .withClustering(CLUSTERING)
+                .withJsonClustering(
+                    ValueProvider.StaticValueProvider.of(
+                        BigQueryHelpers.toJsonString(CLUSTERING.getFields())))
                 
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                 
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS));
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
index f5f4ddb9547..6779920702d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
@@ -20,12 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.junit.Assert.assertEquals;
 
 import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
+import java.util.Arrays;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
@@ -258,4 +260,13 @@ public class BigQueryHelpersTest {
 
     assertEquals(dataset.get(), 
noDataset.setDatasetId(dataset.get()).getDatasetId());
   }
+
+  @Test
+  public void testClusteringJsonConversion() {
+    Clustering clustering =
+        new Clustering().setFields(Arrays.asList("column1", "column2", 
"column3"));
+    String jsonClusteringFields = "[\"column1\", \"column2\", \"column3\"]";
+
+    assertEquals(clustering, 
BigQueryHelpers.clusteringFromJsonFields(jsonClusteringFields));
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index 7f2ff894548..ce4c80adb95 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.TableRow;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -89,7 +90,7 @@ public class BigQueryIOTranslationTest {
     WRITE_TRANSFORM_SCHEMA_MAPPING.put("getDynamicDestinations", 
"dynamic_destinations");
     WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonSchema", "json_schema");
     WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonTimePartitioning", 
"json_time_partitioning");
-    WRITE_TRANSFORM_SCHEMA_MAPPING.put("getClustering", "clustering");
+    WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonClustering", "clustering");
     WRITE_TRANSFORM_SCHEMA_MAPPING.put("getCreateDisposition", 
"create_disposition");
     WRITE_TRANSFORM_SCHEMA_MAPPING.put("getWriteDisposition", 
"write_disposition");
     WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSchemaUpdateOptions", 
"schema_update_options");
@@ -237,6 +238,7 @@ public class BigQueryIOTranslationTest {
   @Test
   public void testReCreateWriteTransformFromRowTable() {
     // setting a subset of fields here.
+    Clustering testClustering = new Clustering().setFields(Arrays.asList("a", 
"b", "c"));
     BigQueryIO.Write<?> writeTransform =
         BigQueryIO.write()
             .to("dummyproject:dummydataset.dummytable")
@@ -244,6 +246,7 @@ public class BigQueryIOTranslationTest {
             .withTriggeringFrequency(org.joda.time.Duration.millis(10000))
             .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
             .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+            .withClustering(testClustering)
             .withKmsKey("dummykmskey");
 
     BigQueryIOTranslation.BigQueryIOWriteTranslator translator =
@@ -251,7 +254,7 @@ public class BigQueryIOTranslationTest {
     Row row = translator.toConfigRow(writeTransform);
 
     PipelineOptions options = PipelineOptionsFactory.create();
-    options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0");
+    options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.56.0");
     BigQueryIO.Write<?> writeTransformFromRow =
         (BigQueryIO.Write<?>) translator.fromConfigRow(row, options);
     assertNotNull(writeTransformFromRow.getTable());
@@ -261,6 +264,9 @@ public class BigQueryIOTranslationTest {
     assertEquals(WriteDisposition.WRITE_TRUNCATE, 
writeTransformFromRow.getWriteDisposition());
     assertEquals(CreateDisposition.CREATE_NEVER, 
writeTransformFromRow.getCreateDisposition());
     assertEquals("dummykmskey", writeTransformFromRow.getKmsKey());
+    assertEquals(
+        BigQueryHelpers.toJsonString(testClustering),
+        writeTransformFromRow.getJsonClustering().get());
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
index 22e4feb3c05..008cc02beee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
@@ -225,6 +225,7 @@ public class BigQueryTimePartitioningClusteringIT {
     Table table = bqClient.tables().get(options.getProject(), DATASET_NAME, 
tableName).execute();
 
     Assert.assertEquals(table.getClustering(), CLUSTERING);
+    Assert.assertEquals(table.getTimePartitioning(), TIME_PARTITIONING);
   }
 
   @Test

Reply via email to