This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch release-2.56.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.56.0 by this push: new 3c7a01360df Cherry picking (#30460) BQ clustering valueprovider (#31039) 3c7a01360df is described below commit 3c7a01360df3635ae56b7073dfa36e75e7492f61 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Thu Apr 18 11:47:41 2024 -0400 Cherry picking (#30460) BQ clustering valueprovider (#31039) * Support clustering with value provider * remove * add some documentation * fix * address comments * update test * spotless * use serializable json clustering; fix translation * fall back on super's clustering and time partitioning when needed * fork based on version 2.56.0 * fix test --- .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 22 +++++++++++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 31 ++++++++++++++-------- .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 ++++++++----- .../gcp/bigquery/DynamicDestinationsHelpers.java | 8 ++++++ .../sdk/io/gcp/bigquery/BigQueryClusteringIT.java | 5 +++- .../sdk/io/gcp/bigquery/BigQueryHelpersTest.java | 11 ++++++++ .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +++++-- .../BigQueryTimePartitioningClusteringIT.java | 1 + 8 files changed, 85 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index c4ad09ce6ea..8c600cf780a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobReference; @@ -31,6 +33,8 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; @@ -40,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.regex.Matcher; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions; @@ -704,6 +709,23 @@ public class BigQueryHelpers { } } + static Clustering clusteringFromJsonFields(String jsonStringClustering) { + JsonElement jsonClustering = JsonParser.parseString(jsonStringClustering); + + checkArgument( + jsonClustering.isJsonArray(), + "Received an invalid Clustering json string: %s." + + "Please provide a serialized json array like so: [\"column1\", \"column2\"]", + jsonStringClustering); + + List<String> fields = + jsonClustering.getAsJsonArray().asList().stream() + .map(JsonElement::getAsString) + .collect(Collectors.toList()); + + return new Clustering().setFields(fields); + } + static String resolveTempLocation( String tempLocationDir, String bigQueryOperationName, String stepUuid) { return FileSystems.matchNewResource(tempLocationDir, true) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index a646e1e6247..fce8f1c5d40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2350,7 +2350,7 @@ public class BigQueryIO { abstract @Nullable ValueProvider<String> getJsonTimePartitioning(); - abstract @Nullable Clustering getClustering(); + abstract @Nullable ValueProvider<String> getJsonClustering(); abstract CreateDisposition getCreateDisposition(); @@ -2459,7 +2459,7 @@ public class BigQueryIO { abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning); - abstract Builder<T> setClustering(Clustering clustering); + abstract Builder<T> setJsonClustering(ValueProvider<String> clustering); abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition); @@ -2826,8 +2826,18 @@ public class BigQueryIO { * tables, the fields here will be ignored; call {@link #withClustering()} instead. */ public Write<T> withClustering(Clustering clustering) { - checkArgument(clustering != null, "clustering can not be null"); - return toBuilder().setClustering(clustering).build(); + checkArgument(clustering != null, "clustering cannot be null"); + return withJsonClustering(StaticValueProvider.of(BigQueryHelpers.toJsonString(clustering))); + } + + /** + * The same as {@link #withClustering(Clustering)}, but takes a JSON-serialized Clustering + * object in a deferred {@link ValueProvider}. For example: `"{"fields": ["column1", "column2", + * "column3"]}"` + */ + public Write<T> withJsonClustering(ValueProvider<String> jsonClustering) { + checkArgument(jsonClustering != null, "clustering cannot be null"); + return toBuilder().setJsonClustering(jsonClustering).build(); } /** @@ -2844,7 +2854,7 @@ public class BigQueryIO { * read state written with a previous version. */ public Write<T> withClustering() { - return toBuilder().setClustering(new Clustering()).build(); + return withClustering(new Clustering()); } /** Specifies whether the table should be created if it does not exist. */ @@ -3420,10 +3430,10 @@ public class BigQueryIO { if (getJsonTableRef() != null) { dynamicDestinations = DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef( - getJsonTableRef(), getTableDescription(), getClustering() != null); + getJsonTableRef(), getTableDescription(), getJsonClustering() != null); } else if (getTableFunction() != null) { dynamicDestinations = - new TableFunctionDestinations<>(getTableFunction(), getClustering() != null); + new TableFunctionDestinations<>(getTableFunction(), getJsonClustering() != null); } // Wrap with a DynamicDestinations class that will provide a schema. There might be no @@ -3440,13 +3450,12 @@ public class BigQueryIO { } // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning. - if (getJsonTimePartitioning() != null - || Optional.ofNullable(getClustering()).map(Clustering::getFields).isPresent()) { + if (getJsonTimePartitioning() != null || (getJsonClustering() != null)) { dynamicDestinations = new ConstantTimePartitioningClusteringDestinations<>( (DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonTimePartitioning(), - StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering()))); + getJsonClustering()); } if (getPrimaryKey() != null) { dynamicDestinations = @@ -3699,7 +3708,7 @@ public class BigQueryIO { elementCoder, rowWriterFactory, getKmsKey(), - getClustering() != null, + getJsonClustering() != null, getUseAvroLogicalTypes(), getWriteTempDataset(), getBadRecordRouter(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 9b499ea3253..fee79f5896c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; -import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.TableRow; import com.google.auto.service.AutoService; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; @@ -373,7 +372,7 @@ public class BigQueryIOTranslation { .addNullableByteArrayField("dynamic_destinations") .addNullableStringField("json_schema") .addNullableStringField("json_time_partitioning") - .addNullableByteArrayField("clustering") + .addNullableStringField("clustering") .addNullableByteArrayField("create_disposition") .addNullableByteArrayField("write_disposition") .addNullableArrayField("schema_update_options", FieldType.BYTES) @@ -474,8 +473,8 @@ public class BigQueryIOTranslation { fieldValues.put( "json_time_partitioning", toByteArray(transform.getJsonTimePartitioning().get())); } - if (transform.getClustering() != null) { - fieldValues.put("clustering", toByteArray(transform.getClustering())); + if (transform.getJsonClustering() != null) { + fieldValues.put("clustering", transform.getJsonClustering().get()); } if (transform.getCreateDisposition() != null) { fieldValues.put("create_disposition", toByteArray(transform.getCreateDisposition())); @@ -658,9 +657,14 @@ public class BigQueryIOTranslation { if (jsonTimePartitioning != null) { builder = builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning)); } - byte[] clusteringBytes = configRow.getBytes("clustering"); - if (clusteringBytes != null) { - builder = builder.setClustering((Clustering) fromByteArray(clusteringBytes)); + // Translation with Clustering is broken before 2.56.0, where we used to attempt to + // serialize a non-serializable Clustering object to bytes. + // In 2.56.0 onwards, we translate using the json string representation instead. + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") >= 0) { + String jsonClustering = configRow.getString("clustering"); + if (jsonClustering != null) { + builder = builder.setJsonClustering(StaticValueProvider.of(jsonClustering)); + } } byte[] createDispositionBytes = configRow.getBytes("create_disposition"); if (createDispositionBytes != null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 1f042a81eb9..eed4314e391 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.gson.JsonParser; import java.io.IOException; import java.util.List; import java.util.Map; @@ -303,7 +304,14 @@ class DynamicDestinationsHelpers { TableDestination destination = super.getDestination(element); String partitioning = Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null); + if (partitioning == null + || JsonParser.parseString(partitioning).getAsJsonObject().isEmpty()) { + partitioning = destination.getJsonTimePartitioning(); + } String clustering = Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null); + if (clustering == null || JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) { + clustering = destination.getJsonClustering(); + } return new TableDestination( destination.getTableSpec(), destination.getTableDescription(), partitioning, clustering); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java index 3ee6931dd98..46989021ee9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -171,7 +172,9 @@ public class BigQueryClusteringIT { .apply( BigQueryIO.writeTableRows() .to(new ClusteredDestinations(tableName)) - .withClustering(CLUSTERING) + .withJsonClustering( + ValueProvider.StaticValueProvider.of( + BigQueryHelpers.toJsonString(CLUSTERING.getFields()))) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) .withMethod(BigQueryIO.Write.Method.FILE_LOADS)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java index f5f4ddb9547..6779920702d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java @@ -20,12 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.junit.Assert.assertEquals; import com.google.api.client.util.Data; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; +import java.util.Arrays; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -258,4 +260,13 @@ public class BigQueryHelpersTest { assertEquals(dataset.get(), noDataset.setDatasetId(dataset.get()).getDatasetId()); } + + @Test + public void testClusteringJsonConversion() { + Clustering clustering = + new Clustering().setFields(Arrays.asList("column1", "column2", "column3")); + String jsonClusteringFields = "[\"column1\", \"column2\", \"column3\"]"; + + assertEquals(clustering, BigQueryHelpers.clusteringFromJsonFields(jsonClusteringFields)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 7f2ff894548..ce4c80adb95 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.TableRow; import java.util.Arrays; import java.util.HashMap; @@ -89,7 +90,7 @@ public class BigQueryIOTranslationTest { WRITE_TRANSFORM_SCHEMA_MAPPING.put("getDynamicDestinations", "dynamic_destinations"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonSchema", "json_schema"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonTimePartitioning", "json_time_partitioning"); - WRITE_TRANSFORM_SCHEMA_MAPPING.put("getClustering", "clustering"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonClustering", "clustering"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getCreateDisposition", "create_disposition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getWriteDisposition", "write_disposition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSchemaUpdateOptions", "schema_update_options"); @@ -237,6 +238,7 @@ public class BigQueryIOTranslationTest { @Test public void testReCreateWriteTransformFromRowTable() { // setting a subset of fields here. + Clustering testClustering = new Clustering().setFields(Arrays.asList("a", "b", "c")); BigQueryIO.Write<?> writeTransform = BigQueryIO.write() .to("dummyproject:dummydataset.dummytable") @@ -244,6 +246,7 @@ public class BigQueryIOTranslationTest { .withTriggeringFrequency(org.joda.time.Duration.millis(10000)) .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE) .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withClustering(testClustering) .withKmsKey("dummykmskey"); BigQueryIOTranslation.BigQueryIOWriteTranslator translator = @@ -251,7 +254,7 @@ public class BigQueryIOTranslationTest { Row row = translator.toConfigRow(writeTransform); PipelineOptions options = PipelineOptionsFactory.create(); - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0"); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.56.0"); BigQueryIO.Write<?> writeTransformFromRow = (BigQueryIO.Write<?>) translator.fromConfigRow(row, options); assertNotNull(writeTransformFromRow.getTable()); @@ -261,6 +264,9 @@ public class BigQueryIOTranslationTest { assertEquals(WriteDisposition.WRITE_TRUNCATE, writeTransformFromRow.getWriteDisposition()); assertEquals(CreateDisposition.CREATE_NEVER, writeTransformFromRow.getCreateDisposition()); assertEquals("dummykmskey", writeTransformFromRow.getKmsKey()); + assertEquals( + BigQueryHelpers.toJsonString(testClustering), + writeTransformFromRow.getJsonClustering().get()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java index 22e4feb3c05..008cc02beee 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java @@ -225,6 +225,7 @@ public class BigQueryTimePartitioningClusteringIT { Table table = bqClient.tables().get(options.getProject(), DATASET_NAME, tableName).execute(); Assert.assertEquals(table.getClustering(), CLUSTERING); + Assert.assertEquals(table.getTimePartitioning(), TIME_PARTITIONING); } @Test