This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2ee12027627 Merge pull request #25113: #25112 No longer use GetTable to implement CREATE_IF_NEEDED to avoid low quotas 2ee12027627 is described below commit 2ee12027627dd842c52053bec833e0c56d9ebf51 Author: Reuven Lax <re...@google.com> AuthorDate: Mon May 22 15:34:53 2023 -0700 Merge pull request #25113: #25112 No longer use GetTable to implement CREATE_IF_NEEDED to avoid low quotas --- .../io/gcp/bigquery/CreateTableDestinations.java | 128 ---------------- .../sdk/io/gcp/bigquery/CreateTableHelpers.java | 50 ++++++- .../beam/sdk/io/gcp/bigquery/CreateTables.java | 2 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 77 +++++----- .../StorageApiWriteRecordsInconsistent.java | 12 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 165 ++++++++++++++------- .../bigquery/StorageApiWritesShardedRecords.java | 47 +++++- .../sdk/io/gcp/testing/FakeDatasetService.java | 42 ++++-- .../bigquery/StorageApiSinkCreateIfNeededIT.java | 140 +++++++++++++++++ 9 files changed, 404 insertions(+), 259 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java deleted file mode 100644 index 0c2babdeef8..00000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import com.google.api.services.bigquery.model.TableSchema; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * Creates any tables needed before performing writes to the tables. This is a side-effect {@link - * DoFn}, and returns the original collection unchanged. - */ -public class CreateTableDestinations<DestinationT, ElementT> - extends PTransform< - PCollection<KV<DestinationT, ElementT>>, PCollection<KV<DestinationT, ElementT>>> { - private final CreateDisposition createDisposition; - private final BigQueryServices bqServices; - private final DynamicDestinations<?, DestinationT> dynamicDestinations; - private final @Nullable String kmsKey; - - public CreateTableDestinations( - CreateDisposition createDisposition, - DynamicDestinations<?, DestinationT> dynamicDestinations) { - this(createDisposition, new BigQueryServicesImpl(), dynamicDestinations, null); - } - - public CreateTableDestinations( - CreateDisposition createDisposition, - BigQueryServices bqServices, - DynamicDestinations<?, DestinationT> dynamicDestinations, - @Nullable String kmsKey) { - this.createDisposition = createDisposition; - this.bqServices = bqServices; - this.dynamicDestinations = dynamicDestinations; - this.kmsKey = kmsKey; - } - - CreateTableDestinations<DestinationT, ElementT> withKmsKey(String kmsKey) { - return new CreateTableDestinations<>( - createDisposition, bqServices, dynamicDestinations, kmsKey); - } - - CreateTableDestinations<DestinationT, ElementT> withTestServices(BigQueryServices bqServices) { - return new CreateTableDestinations<>( - createDisposition, bqServices, dynamicDestinations, kmsKey); - } - - @Override - public PCollection<KV<DestinationT, ElementT>> expand( - PCollection<KV<DestinationT, ElementT>> input) { - List<PCollectionView<?>> sideInputs = Lists.newArrayList(); - sideInputs.addAll(dynamicDestinations.getSideInputs()); - - return input.apply("CreateTables", ParDo.of(new CreateTablesFn()).withSideInputs(sideInputs)); - } - - private class CreateTablesFn - extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, ElementT>> { - private Map<DestinationT, TableDestination> destinations = Maps.newHashMap(); - - @StartBundle - public void startBundle() { - destinations = Maps.newHashMap(); - } - - @ProcessElement - public void processElement( - ProcessContext context, - @Element KV<DestinationT, ElementT> element, - OutputReceiver<KV<DestinationT, ElementT>> o) { - dynamicDestinations.setSideInputAccessorFromProcessContext(context); - destinations.computeIfAbsent( - element.getKey(), - dest -> { - @Nullable TableDestination tableDestination1 = dynamicDestinations.getTable(dest); - checkArgument( - tableDestination1 != null, - "DynamicDestinations.getTable() may not return null, " - + "but %s returned null for destination %s", - dynamicDestinations, - dest); - @Nullable - Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder(); - Supplier<@Nullable TableSchema> schemaSupplier = - () -> dynamicDestinations.getSchema(dest); - return CreateTableHelpers.possiblyCreateTable( - context, - tableDestination1, - schemaSupplier, - createDisposition, - destinationCoder, - kmsKey, - bqServices); - }); - - o.output(element); - } - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 49ac13691da..ae7e82c0ad1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -19,24 +19,31 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.gax.rpc.ApiException; import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; public class CreateTableHelpers { /** @@ -46,8 +53,37 @@ public class CreateTableHelpers { */ private static Set<String> createdTables = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static final Duration INITIAL_RPC_BACKOFF = Duration.millis(500); + private static final FluentBackoff DEFAULT_BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(INITIAL_RPC_BACKOFF); + + // When CREATE_IF_NEEDED is specified, BQ tables should be created if they do not exist. This + // method detects + // errors on table operations, and attempts to create the table if necessary. + static void createTableWrapper(Callable<Void> action, Callable<Boolean> tryCreateTable) + throws Exception { + BackOff backoff = BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff()); + RuntimeException lastException = null; + do { + try { + action.call(); + return; + } catch (ApiException | StatusRuntimeException e) { + lastException = e; + // TODO: Once BigQuery reliably returns a consistent error on table not found, we should + // only try creating + // the table on that error. + boolean created = tryCreateTable.call(); + if (!created) { + throw e; + } + } + } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff)); + throw Preconditions.checkStateNotNull(lastException); + } + static TableDestination possiblyCreateTable( - DoFn<?, ?>.ProcessContext context, + BigQueryOptions bigQueryOptions, TableDestination tableDestination, Supplier<@Nullable TableSchema> schemaSupplier, CreateDisposition createDisposition, @@ -72,8 +108,7 @@ public class CreateTableHelpers { tableDestination); TableReference tableReference = tableDestination.getTableReference().clone(); if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - tableReference.setProjectId( - context.getPipelineOptions().as(BigQueryOptions.class).getProject()); + tableReference.setProjectId(bigQueryOptions.getProject()); tableDestination = tableDestination.withTableReference(tableReference); } if (createDisposition == CreateDisposition.CREATE_NEVER) { @@ -88,7 +123,7 @@ public class CreateTableHelpers { synchronized (createdTables) { if (!createdTables.contains(tableSpec)) { tryCreateTable( - context, + bigQueryOptions, schemaSupplier, tableDestination, createDisposition, @@ -102,7 +137,7 @@ public class CreateTableHelpers { } private static void tryCreateTable( - DoFn<?, ?>.ProcessContext context, + BigQueryOptions options, Supplier<@Nullable TableSchema> schemaSupplier, TableDestination tableDestination, CreateDisposition createDisposition, @@ -111,8 +146,7 @@ public class CreateTableHelpers { BigQueryServices bqServices) { TableReference tableReference = tableDestination.getTableReference().clone(); tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId())); - try (DatasetService datasetService = - bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) { + try (DatasetService datasetService = bqServices.getDatasetService(options)) { if (datasetService.getTable( tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC) == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index 278f49511c0..e1e1f22df63 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -114,7 +114,7 @@ public class CreateTables<DestinationT, ElementT> Supplier<@Nullable TableSchema> schemaSupplier = () -> dynamicDestinations.getSchema(dest); return CreateTableHelpers.possiblyCreateTable( - context, + context.getPipelineOptions().as(BigQueryOptions.class), tableDestination1, schemaSupplier, createDisposition, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 30f62461812..1a49fd31b2e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -116,20 +116,15 @@ public class StorageApiLoads<DestinationT, ElementT> input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows())); PCollectionTuple convertMessagesResult = - inputInGlobalWindow - .apply( - "CreateTables", - new CreateTableDestinations<>( - createDisposition, bqServices, dynamicDestinations, kmsKey)) - .apply( - "Convert", - new StorageApiConvertMessages<>( - dynamicDestinations, - bqServices, - failedRowsTag, - successfulConvertedRowsTag, - BigQueryStorageApiInsertErrorCoder.of(), - successCoder)); + inputInGlobalWindow.apply( + "Convert", + new StorageApiConvertMessages<>( + dynamicDestinations, + bqServices, + failedRowsTag, + successfulConvertedRowsTag, + BigQueryStorageApiInsertErrorCoder.of(), + successCoder)); PCollectionTuple writeRecordsResult = convertMessagesResult .get(successfulConvertedRowsTag) @@ -143,7 +138,9 @@ public class StorageApiLoads<DestinationT, ElementT> BigQueryStorageApiInsertErrorCoder.of(), TableRowJsonCoder.of(), autoUpdateSchema, - ignoreUnknownValues)); + ignoreUnknownValues, + createDisposition, + kmsKey)); PCollection<BigQueryStorageApiInsertError> insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) @@ -174,20 +171,15 @@ public class StorageApiLoads<DestinationT, ElementT> PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow = input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows())); PCollectionTuple convertMessagesResult = - inputInGlobalWindow - .apply( - "CreateTables", - new CreateTableDestinations<>( - createDisposition, bqServices, dynamicDestinations, kmsKey)) - .apply( - "Convert", - new StorageApiConvertMessages<>( - dynamicDestinations, - bqServices, - failedRowsTag, - successfulConvertedRowsTag, - BigQueryStorageApiInsertErrorCoder.of(), - successCoder)); + inputInGlobalWindow.apply( + "Convert", + new StorageApiConvertMessages<>( + dynamicDestinations, + bqServices, + failedRowsTag, + successfulConvertedRowsTag, + BigQueryStorageApiInsertErrorCoder.of(), + successCoder)); PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> groupedRecords; @@ -295,20 +287,15 @@ public class StorageApiLoads<DestinationT, ElementT> input.apply( "rewindowIntoGlobal", Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())); PCollectionTuple convertMessagesResult = - inputInGlobalWindow - .apply( - "CreateTables", - new CreateTableDestinations<>( - createDisposition, bqServices, dynamicDestinations, kmsKey)) - .apply( - "Convert", - new StorageApiConvertMessages<>( - dynamicDestinations, - bqServices, - failedRowsTag, - successfulConvertedRowsTag, - BigQueryStorageApiInsertErrorCoder.of(), - successCoder)); + inputInGlobalWindow.apply( + "Convert", + new StorageApiConvertMessages<>( + dynamicDestinations, + bqServices, + failedRowsTag, + successfulConvertedRowsTag, + BigQueryStorageApiInsertErrorCoder.of(), + successCoder)); PCollectionTuple writeRecordsResult = convertMessagesResult @@ -323,7 +310,9 @@ public class StorageApiLoads<DestinationT, ElementT> BigQueryStorageApiInsertErrorCoder.of(), TableRowJsonCoder.of(), autoUpdateSchema, - ignoreUnknownValues)); + ignoreUnknownValues, + createDisposition, + kmsKey)); PCollection<BigQueryStorageApiInsertError> insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index 7c6445fcf11..9d09a3da917 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -46,6 +46,8 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT> private final Coder<TableRow> successfulRowsCoder; private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; + private final BigQueryIO.Write.CreateDisposition createDisposition; + private final @Nullable String kmsKey; public StorageApiWriteRecordsInconsistent( StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations, @@ -55,7 +57,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT> Coder<BigQueryStorageApiInsertError> failedRowsCoder, Coder<TableRow> successfulRowsCoder, boolean autoUpdateSchema, - boolean ignoreUnknownValues) { + boolean ignoreUnknownValues, + BigQueryIO.Write.CreateDisposition createDisposition, + @Nullable String kmsKey) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; @@ -64,6 +68,8 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT> this.successfulRowsTag = successfulRowsTag; this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; + this.createDisposition = createDisposition; + this.kmsKey = kmsKey; } @Override @@ -91,7 +97,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT> failedRowsTag, successfulRowsTag, autoUpdateSchema, - ignoreUnknownValues)) + ignoreUnknownValues, + createDisposition, + kmsKey)) .withOutputTags(finalizeTag, tupleTagList) .withSideInputs(dynamicDestinations.getSideInputs())); result.get(failedRowsTag).setCoder(failedRowsCoder); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 46b542a84a5..e558d7a1fd1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -41,9 +41,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.beam.sdk.coders.Coder; @@ -102,6 +104,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); + private final BigQueryIO.Write.CreateDisposition createDisposition; + private final @Nullable String kmsKey; /** * The Guava cache object is thread-safe. However our protocol requires that client pin the @@ -156,7 +160,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> Coder<BigQueryStorageApiInsertError> failedRowsCoder, Coder<TableRow> successfulRowsCoder, boolean autoUpdateSchema, - boolean ignoreUnknownValues) { + boolean ignoreUnknownValues, + BigQueryIO.Write.CreateDisposition createDisposition, + @Nullable String kmsKey) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; @@ -165,6 +171,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> this.successfulRowsCoder = successfulRowsCoder; this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; + this.createDisposition = createDisposition; + this.kmsKey = kmsKey; } @Override @@ -194,7 +202,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> failedRowsTag, successfulRowsTag, autoUpdateSchema, - ignoreUnknownValues)) + ignoreUnknownValues, + createDisposition, + kmsKey)) .withOutputTags(finalizeTag, tupleTagList) .withSideInputs(dynamicDestinations.getSideInputs())); @@ -221,6 +231,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> private final @Nullable TupleTag<TableRow> successfulRowsTag; private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; + private final BigQueryIO.Write.CreateDisposition createDisposition; + private final @Nullable String kmsKey; static class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> { long offset; @@ -256,6 +268,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> Metrics.counter( StorageApiWritesShardedRecords.WriteRecordsDoFn.class, "rowsSentToFailedRowsCollection"); + private final Callable<Boolean> tryCreateTable; private final boolean useDefaultStream; private TableSchema initialTableSchema; @@ -271,7 +284,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> boolean useDefaultStream, int streamAppendClientCount, boolean usingMultiplexing, - long maxRequestSize) + long maxRequestSize, + Callable<Boolean> tryCreateTable) throws Exception { this.tableUrn = tableUrn; this.pendingMessages = Lists.newArrayList(); @@ -282,6 +296,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> this.clientNumber = new Random().nextInt(streamAppendClientCount); this.usingMultiplexing = usingMultiplexing; this.maxRequestSize = maxRequestSize; + this.tryCreateTable = tryCreateTable; } void teardown() { @@ -306,64 +321,79 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> return this.streamName; } - String getOrCreateStreamName() { - if (Strings.isNullOrEmpty(this.streamName)) { - try { - if (!useDefaultStream) { - this.streamName = - Preconditions.checkStateNotNull(maybeDatasetService) - .createWriteStream(tableUrn, Type.PENDING) - .getName(); - this.currentOffset = 0; - } else { - this.streamName = getDefaultStreamName(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } + String getOrCreateStreamName() throws Exception { + CreateTableHelpers.createTableWrapper( + () -> { + if (!useDefaultStream) { + this.streamName = + Preconditions.checkStateNotNull(maybeDatasetService) + .createWriteStream(tableUrn, Type.PENDING) + .getName(); + this.currentOffset = 0; + } else { + this.streamName = getDefaultStreamName(); + } + return null; + }, + tryCreateTable); return this.streamName; } AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exception { TableSchema tableSchema = (updatedSchema != null) ? updatedSchema : getCurrentTableSchema(streamName); - AppendClientInfo appendClientInfo = - AppendClientInfo.of( - tableSchema, - // Make sure that the client is always closed in a different thread to avoid - // blocking. - client -> - runAsyncIgnoreFailure( - closeWriterExecutor, - () -> { - synchronized (APPEND_CLIENTS) { - // Remove the pin owned by the cache. - client.unpin(); - client.close(); - } - })); - appendClientInfo = - appendClientInfo.withAppendClient( - Preconditions.checkStateNotNull(maybeDatasetService), - () -> streamName, - usingMultiplexing); + AtomicReference<AppendClientInfo> appendClientInfo = + new AtomicReference<>( + AppendClientInfo.of( + tableSchema, + // Make sure that the client is always closed in a different thread to avoid + // blocking. + client -> + runAsyncIgnoreFailure( + closeWriterExecutor, + () -> { + synchronized (APPEND_CLIENTS) { + // Remove the pin owned by the cache. + client.unpin(); + client.close(); + } + }))); + + CreateTableHelpers.createTableWrapper( + () -> { + appendClientInfo.set( + appendClientInfo + .get() + .withAppendClient( + Preconditions.checkStateNotNull(maybeDatasetService), + () -> streamName, + usingMultiplexing)); + Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()); + return null; + }, + tryCreateTable); + // This pin is "owned" by the cache. - Preconditions.checkStateNotNull(appendClientInfo.getStreamAppendClient()).pin(); - return appendClientInfo; + Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()).pin(); + return appendClientInfo.get(); } - TableSchema getCurrentTableSchema(String stream) { - TableSchema currentSchema = initialTableSchema; - if (autoUpdateSchema) { - @Nullable - WriteStream writeStream = - Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName); - if (writeStream != null && writeStream.hasTableSchema()) { - currentSchema = writeStream.getTableSchema(); - } - } - return currentSchema; + TableSchema getCurrentTableSchema(String stream) throws Exception { + AtomicReference<TableSchema> currentSchema = new AtomicReference<>(initialTableSchema); + CreateTableHelpers.createTableWrapper( + () -> { + if (autoUpdateSchema) { + @Nullable + WriteStream writeStream = + Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName); + if (writeStream != null && writeStream.hasTableSchema()) { + currentSchema.set(writeStream.getTableSchema()); + } + } + return null; + }, + tryCreateTable); + return currentSchema.get(); } AppendClientInfo getAppendClientInfo( @@ -640,6 +670,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> + " failed with stream " + "doesn't exist"); } + // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces + // them. + try { + tryCreateTable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } invalidateWriteStream(); @@ -731,7 +768,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> TupleTag<BigQueryStorageApiInsertError> failedRowsTag, @Nullable TupleTag<TableRow> successfulRowsTag, boolean autoUpdateSchema, - boolean ignoreUnknownValues) { + boolean ignoreUnknownValues, + BigQueryIO.Write.CreateDisposition createDisposition, + @Nullable String kmsKey) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; @@ -744,6 +783,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> this.successfulRowsTag = successfulRowsTag; this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; + this.createDisposition = createDisposition; + this.kmsKey = kmsKey; } boolean shouldFlush() { @@ -823,6 +864,20 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> + "but %s returned null for destination %s", dynamicDestinations, destination); + @Nullable Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder(); + Callable<Boolean> tryCreateTable = + () -> { + CreateTableHelpers.possiblyCreateTable( + c.getPipelineOptions().as(BigQueryOptions.class), + tableDestination1, + () -> dynamicDestinations.getSchema(destination), + createDisposition, + destinationCoder, + kmsKey, + bqServices); + return true; + }; + MessageConverter<ElementT> messageConverter; try { messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService); @@ -833,7 +888,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> useDefaultStream, streamAppendClientCount, bigQueryOptions.getUseStorageApiConnectionPool(), - bigQueryOptions.getStorageWriteApiMaxRequestSize()); + bigQueryOptions.getStorageWriteApiMaxRequestSize(), + tryCreateTable); } catch (Exception e) { throw new RuntimeException(e); } @@ -850,6 +906,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> DatasetService initializedDatasetService = initializeDatasetService(pipelineOptions); dynamicDestinations.setSideInputAccessorFromProcessContext(c); Preconditions.checkStateNotNull(destinations); + DestinationState state = destinations.computeIfAbsent( element.getKey(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index e0353bf9a90..c546d4f2ede 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -353,20 +353,30 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object ValueState<String> streamName, ValueState<Long> streamOffset, Timer streamIdleTimer, - DatasetService datasetService) { + DatasetService datasetService, + Callable<Boolean> tryCreateTable) { try { - String stream = streamName.read(); - if (stream == null || "".equals(stream)) { + final @Nullable String streamValue = streamName.read(); + AtomicReference<String> stream = new AtomicReference<>(); + if (streamValue == null || "".equals(streamValue)) { // In a buffered stream, data is only visible up to the offset to which it was flushed. - stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName(); - streamName.write(stream); + CreateTableHelpers.createTableWrapper( + () -> { + stream.set(datasetService.createWriteStream(tableId, Type.BUFFERED).getName()); + return null; + }, + tryCreateTable); + + streamName.write(stream.get()); streamOffset.write(0L); streamsCreated.inc(); + } else { + stream.set(streamValue); } // Reset the idle timer. streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative(); - return stream; + return stream.get(); } catch (Exception e) { throw new RuntimeException(e); } @@ -427,8 +437,24 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object final String tableId = tableDestination.getTableUrn(bigQueryOptions); final DatasetService datasetService = getDatasetService(pipelineOptions); + Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder(); + Callable<Boolean> tryCreateTable = + () -> { + CreateTableHelpers.possiblyCreateTable( + c.getPipelineOptions().as(BigQueryOptions.class), + tableDestination, + () -> dynamicDestinations.getSchema(element.getKey().getKey()), + createDisposition, + destinationCoder, + kmsKey, + bqServices); + return true; + }; + Supplier<String> getOrCreateStream = - () -> getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService); + () -> + getOrCreateStream( + tableId, streamName, streamOffset, idleTimer, datasetService, tryCreateTable); Callable<AppendClientInfo> getAppendClientInfo = () -> { @Nullable TableSchema tableSchema; @@ -626,6 +652,13 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object // Invalidate the StreamWriter and force a new one to be created. LOG.error( "Got error " + failedContext.getError() + " closing " + failedContext.streamName); + + // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces them. + try { + tryCreateTable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } clearClients.accept(failedContexts); appendFailures.inc(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 266de15d4be..6828c764a36 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -23,6 +23,8 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.Table; @@ -45,6 +47,7 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Timestamp; import com.google.rpc.Code; +import io.grpc.Status; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; @@ -509,20 +512,24 @@ public class FakeDatasetService implements DatasetService, Serializable { } @Override - public WriteStream createWriteStream(String tableUrn, Type type) - throws IOException, InterruptedException { - TableReference tableReference = - BigQueryHelpers.parseTableUrn(BigQueryHelpers.stripPartitionDecorator(tableUrn)); - synchronized (FakeDatasetService.class) { - TableContainer tableContainer = - getTableContainer( - tableReference.getProjectId(), - tableReference.getDatasetId(), - tableReference.getTableId()); - String streamName = UUID.randomUUID().toString(); - Stream stream = new Stream(streamName, tableContainer, type); - writeStreams.put(streamName, stream); - return stream.toWriteStream(); + public WriteStream createWriteStream(String tableUrn, Type type) throws InterruptedException { + try { + TableReference tableReference = + BigQueryHelpers.parseTableUrn(BigQueryHelpers.stripPartitionDecorator(tableUrn)); + synchronized (FakeDatasetService.class) { + TableContainer tableContainer = + getTableContainer( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId()); + String streamName = UUID.randomUUID().toString(); + Stream stream = new Stream(streamName, tableContainer, type); + writeStreams.put(streamName, stream); + return stream.toWriteStream(); + } + } catch (IOException e) { + // TODO(relax): Return the exact error that BigQuery returns. + throw new ApiException(e, GrpcStatusCode.of(Status.Code.NOT_FOUND), false); } } @@ -535,7 +542,8 @@ public class FakeDatasetService implements DatasetService, Serializable { return stream.toWriteStream(); } } - return null; + // TODO(relax): Return the exact error that BigQuery returns. + throw new ApiException(null, GrpcStatusCode.of(Status.Code.NOT_FOUND), false); } @Override @@ -550,6 +558,10 @@ public class FakeDatasetService implements DatasetService, Serializable { this.protoDescriptor = descriptor; synchronized (FakeDatasetService.class) { Stream stream = writeStreams.get(streamName); + if (stream == null) { + // TODO(relax): Return the exact error that BigQuery returns. + throw new ApiException(null, GrpcStatusCode.of(Status.Code.NOT_FOUND), false); + } currentSchema = stream.tableContainer.getTable().getSchema(); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java new file mode 100644 index 00000000000..3cb44c6c7b6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Integration test for table create-if-needed handling when using the storage API. */ +@RunWith(Parameterized.class) +public class StorageApiSinkCreateIfNeededIT { + @Parameterized.Parameters + public static Iterable<Object[]> data() { + return ImmutableList.of(new Object[] {true}, new Object[] {false}); + } + + @Parameterized.Parameter(0) + public boolean useAtLeastOnce; + + private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkCreateIfNeededIT.class); + + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("StorageApiSinkFailedRowsIT"); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = + "storage_api_sink_failed_rows" + System.nanoTime(); + private static final List<TableFieldSchema> FIELDS = + ImmutableList.<TableFieldSchema>builder() + .add(new TableFieldSchema().setType("STRING").setName("str")) + .add(new TableFieldSchema().setType("INT64").setName("tablenum")) + .build(); + private static final TableSchema BASE_TABLE_SCHEMA = new TableSchema().setFields(FIELDS); + + private BigQueryIO.Write.Method getMethod() { + return useAtLeastOnce + ? BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE + : BigQueryIO.Write.Method.STORAGE_WRITE_API; + } + + @BeforeClass + public static void setUpTestEnvironment() throws IOException, InterruptedException { + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @AfterClass + public static void cleanup() { + LOG.info("Start to clean up tables and datasets."); + BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @Test + public void testCreateManyTables() throws IOException, InterruptedException { + List<TableRow> inputs = + LongStream.range(0, 100) + .mapToObj(l -> new TableRow().set("str", "foo").set("tablenum", l)) + .collect(Collectors.toList()); + + String table = "table" + System.nanoTime(); + String tableSpecBase = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table; + runPipeline(getMethod(), tableSpecBase, inputs); + assertTablesCreated(tableSpecBase, 100); + } + + private void assertTablesCreated(String tableSpecPrefix, int expectedRows) + throws IOException, InterruptedException { + TableRow queryResponse = + Iterables.getOnlyElement( + BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(*) FROM `%s`", tableSpecPrefix + "*"), + PROJECT, + true, + true)); + int numRowsWritten = Integer.parseInt((String) queryResponse.get("f0_")); + if (useAtLeastOnce) { + assertThat(numRowsWritten, Matchers.greaterThanOrEqualTo(expectedRows)); + } else { + assertThat(numRowsWritten, Matchers.equalTo(expectedRows)); + } + } + + private static void runPipeline( + BigQueryIO.Write.Method method, String tableSpecBase, Iterable<TableRow> tableRows) { + Pipeline p = Pipeline.create(); + + BigQueryIO.Write<TableRow> write = + BigQueryIO.writeTableRows() + .to(tr -> new TableDestination(tableSpecBase + tr.getValue().get("tablenum"), "")) + .withSchema(BASE_TABLE_SCHEMA) + .withMethod(method) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED); + if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) { + write = write.withNumStorageWriteApiStreams(1); + write = write.withTriggeringFrequency(Duration.standardSeconds(1)); + } + PCollection<TableRow> input = p.apply("Create test cases", Create.of(tableRows)); + input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + input.apply("Write using Storage Write API", write); + + p.run().waitUntilFinish(); + } +}