This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ee12027627 Merge pull request #25113: #25112 No longer use GetTable 
to implement CREATE_IF_NEEDED to avoid low quotas
2ee12027627 is described below

commit 2ee12027627dd842c52053bec833e0c56d9ebf51
Author: Reuven Lax <re...@google.com>
AuthorDate: Mon May 22 15:34:53 2023 -0700

    Merge pull request #25113: #25112 No longer use GetTable to implement 
CREATE_IF_NEEDED to avoid low quotas
---
 .../io/gcp/bigquery/CreateTableDestinations.java   | 128 ----------------
 .../sdk/io/gcp/bigquery/CreateTableHelpers.java    |  50 ++++++-
 .../beam/sdk/io/gcp/bigquery/CreateTables.java     |   2 +-
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  77 +++++-----
 .../StorageApiWriteRecordsInconsistent.java        |  12 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 165 ++++++++++++++-------
 .../bigquery/StorageApiWritesShardedRecords.java   |  47 +++++-
 .../sdk/io/gcp/testing/FakeDatasetService.java     |  42 ++++--
 .../bigquery/StorageApiSinkCreateIfNeededIT.java   | 140 +++++++++++++++++
 9 files changed, 404 insertions(+), 259 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java
deleted file mode 100644
index 0c2babdeef8..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableDestinations.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.bigquery.model.TableSchema;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-/**
- * Creates any tables needed before performing writes to the tables. This is a 
side-effect {@link
- * DoFn}, and returns the original collection unchanged.
- */
-public class CreateTableDestinations<DestinationT, ElementT>
-    extends PTransform<
-        PCollection<KV<DestinationT, ElementT>>, PCollection<KV<DestinationT, 
ElementT>>> {
-  private final CreateDisposition createDisposition;
-  private final BigQueryServices bqServices;
-  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
-  private final @Nullable String kmsKey;
-
-  public CreateTableDestinations(
-      CreateDisposition createDisposition,
-      DynamicDestinations<?, DestinationT> dynamicDestinations) {
-    this(createDisposition, new BigQueryServicesImpl(), dynamicDestinations, 
null);
-  }
-
-  public CreateTableDestinations(
-      CreateDisposition createDisposition,
-      BigQueryServices bqServices,
-      DynamicDestinations<?, DestinationT> dynamicDestinations,
-      @Nullable String kmsKey) {
-    this.createDisposition = createDisposition;
-    this.bqServices = bqServices;
-    this.dynamicDestinations = dynamicDestinations;
-    this.kmsKey = kmsKey;
-  }
-
-  CreateTableDestinations<DestinationT, ElementT> withKmsKey(String kmsKey) {
-    return new CreateTableDestinations<>(
-        createDisposition, bqServices, dynamicDestinations, kmsKey);
-  }
-
-  CreateTableDestinations<DestinationT, ElementT> 
withTestServices(BigQueryServices bqServices) {
-    return new CreateTableDestinations<>(
-        createDisposition, bqServices, dynamicDestinations, kmsKey);
-  }
-
-  @Override
-  public PCollection<KV<DestinationT, ElementT>> expand(
-      PCollection<KV<DestinationT, ElementT>> input) {
-    List<PCollectionView<?>> sideInputs = Lists.newArrayList();
-    sideInputs.addAll(dynamicDestinations.getSideInputs());
-
-    return input.apply("CreateTables", ParDo.of(new 
CreateTablesFn()).withSideInputs(sideInputs));
-  }
-
-  private class CreateTablesFn
-      extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, ElementT>> {
-    private Map<DestinationT, TableDestination> destinations = 
Maps.newHashMap();
-
-    @StartBundle
-    public void startBundle() {
-      destinations = Maps.newHashMap();
-    }
-
-    @ProcessElement
-    public void processElement(
-        ProcessContext context,
-        @Element KV<DestinationT, ElementT> element,
-        OutputReceiver<KV<DestinationT, ElementT>> o) {
-      dynamicDestinations.setSideInputAccessorFromProcessContext(context);
-      destinations.computeIfAbsent(
-          element.getKey(),
-          dest -> {
-            @Nullable TableDestination tableDestination1 = 
dynamicDestinations.getTable(dest);
-            checkArgument(
-                tableDestination1 != null,
-                "DynamicDestinations.getTable() may not return null, "
-                    + "but %s returned null for destination %s",
-                dynamicDestinations,
-                dest);
-            @Nullable
-            Coder<DestinationT> destinationCoder = 
dynamicDestinations.getDestinationCoder();
-            Supplier<@Nullable TableSchema> schemaSupplier =
-                () -> dynamicDestinations.getSchema(dest);
-            return CreateTableHelpers.possiblyCreateTable(
-                context,
-                tableDestination1,
-                schemaSupplier,
-                createDisposition,
-                destinationCoder,
-                kmsKey,
-                bqServices);
-          });
-
-      o.output(element);
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index 49ac13691da..ae7e82c0ad1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -19,24 +19,31 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.gax.rpc.ApiException;
 import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.EncryptionConfiguration;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
+import io.grpc.StatusRuntimeException;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Preconditions;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
 
 public class CreateTableHelpers {
   /**
@@ -46,8 +53,37 @@ public class CreateTableHelpers {
    */
   private static Set<String> createdTables = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
 
+  private static final Duration INITIAL_RPC_BACKOFF = Duration.millis(500);
+  private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =
+      
FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(INITIAL_RPC_BACKOFF);
+
+  // When CREATE_IF_NEEDED is specified, BQ tables should be created if they 
do not exist. This
+  // method detects
+  // errors on table operations, and attempts to create the table if necessary.
+  static void createTableWrapper(Callable<Void> action, Callable<Boolean> 
tryCreateTable)
+      throws Exception {
+    BackOff backoff = 
BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff());
+    RuntimeException lastException = null;
+    do {
+      try {
+        action.call();
+        return;
+      } catch (ApiException | StatusRuntimeException e) {
+        lastException = e;
+        // TODO: Once BigQuery reliably returns a consistent error on table 
not found, we should
+        // only try creating
+        // the table on that error.
+        boolean created = tryCreateTable.call();
+        if (!created) {
+          throw e;
+        }
+      }
+    } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, 
backoff));
+    throw Preconditions.checkStateNotNull(lastException);
+  }
+
   static TableDestination possiblyCreateTable(
-      DoFn<?, ?>.ProcessContext context,
+      BigQueryOptions bigQueryOptions,
       TableDestination tableDestination,
       Supplier<@Nullable TableSchema> schemaSupplier,
       CreateDisposition createDisposition,
@@ -72,8 +108,7 @@ public class CreateTableHelpers {
         tableDestination);
     TableReference tableReference = 
tableDestination.getTableReference().clone();
     if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
-      tableReference.setProjectId(
-          context.getPipelineOptions().as(BigQueryOptions.class).getProject());
+      tableReference.setProjectId(bigQueryOptions.getProject());
       tableDestination = tableDestination.withTableReference(tableReference);
     }
     if (createDisposition == CreateDisposition.CREATE_NEVER) {
@@ -88,7 +123,7 @@ public class CreateTableHelpers {
       synchronized (createdTables) {
         if (!createdTables.contains(tableSpec)) {
           tryCreateTable(
-              context,
+              bigQueryOptions,
               schemaSupplier,
               tableDestination,
               createDisposition,
@@ -102,7 +137,7 @@ public class CreateTableHelpers {
   }
 
   private static void tryCreateTable(
-      DoFn<?, ?>.ProcessContext context,
+      BigQueryOptions options,
       Supplier<@Nullable TableSchema> schemaSupplier,
       TableDestination tableDestination,
       CreateDisposition createDisposition,
@@ -111,8 +146,7 @@ public class CreateTableHelpers {
       BigQueryServices bqServices) {
     TableReference tableReference = 
tableDestination.getTableReference().clone();
     
tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
-    try (DatasetService datasetService =
-        
bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)))
 {
+    try (DatasetService datasetService = 
bqServices.getDatasetService(options)) {
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 278f49511c0..e1e1f22df63 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -114,7 +114,7 @@ public class CreateTables<DestinationT, ElementT>
                 Supplier<@Nullable TableSchema> schemaSupplier =
                     () -> dynamicDestinations.getSchema(dest);
                 return CreateTableHelpers.possiblyCreateTable(
-                    context,
+                    context.getPipelineOptions().as(BigQueryOptions.class),
                     tableDestination1,
                     schemaSupplier,
                     createDisposition,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index 30f62461812..1a49fd31b2e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -116,20 +116,15 @@ public class StorageApiLoads<DestinationT, ElementT>
         input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()));
 
     PCollectionTuple convertMessagesResult =
-        inputInGlobalWindow
-            .apply(
-                "CreateTables",
-                new CreateTableDestinations<>(
-                    createDisposition, bqServices, dynamicDestinations, 
kmsKey))
-            .apply(
-                "Convert",
-                new StorageApiConvertMessages<>(
-                    dynamicDestinations,
-                    bqServices,
-                    failedRowsTag,
-                    successfulConvertedRowsTag,
-                    BigQueryStorageApiInsertErrorCoder.of(),
-                    successCoder));
+        inputInGlobalWindow.apply(
+            "Convert",
+            new StorageApiConvertMessages<>(
+                dynamicDestinations,
+                bqServices,
+                failedRowsTag,
+                successfulConvertedRowsTag,
+                BigQueryStorageApiInsertErrorCoder.of(),
+                successCoder));
     PCollectionTuple writeRecordsResult =
         convertMessagesResult
             .get(successfulConvertedRowsTag)
@@ -143,7 +138,9 @@ public class StorageApiLoads<DestinationT, ElementT>
                     BigQueryStorageApiInsertErrorCoder.of(),
                     TableRowJsonCoder.of(),
                     autoUpdateSchema,
-                    ignoreUnknownValues));
+                    ignoreUnknownValues,
+                    createDisposition,
+                    kmsKey));
 
     PCollection<BigQueryStorageApiInsertError> insertErrors =
         PCollectionList.of(convertMessagesResult.get(failedRowsTag))
@@ -174,20 +171,15 @@ public class StorageApiLoads<DestinationT, ElementT>
     PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
         input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()));
     PCollectionTuple convertMessagesResult =
-        inputInGlobalWindow
-            .apply(
-                "CreateTables",
-                new CreateTableDestinations<>(
-                    createDisposition, bqServices, dynamicDestinations, 
kmsKey))
-            .apply(
-                "Convert",
-                new StorageApiConvertMessages<>(
-                    dynamicDestinations,
-                    bqServices,
-                    failedRowsTag,
-                    successfulConvertedRowsTag,
-                    BigQueryStorageApiInsertErrorCoder.of(),
-                    successCoder));
+        inputInGlobalWindow.apply(
+            "Convert",
+            new StorageApiConvertMessages<>(
+                dynamicDestinations,
+                bqServices,
+                failedRowsTag,
+                successfulConvertedRowsTag,
+                BigQueryStorageApiInsertErrorCoder.of(),
+                successCoder));
 
     PCollection<KV<ShardedKey<DestinationT>, 
Iterable<StorageApiWritePayload>>> groupedRecords;
 
@@ -295,20 +287,15 @@ public class StorageApiLoads<DestinationT, ElementT>
         input.apply(
             "rewindowIntoGlobal", Window.<KV<DestinationT, ElementT>>into(new 
GlobalWindows()));
     PCollectionTuple convertMessagesResult =
-        inputInGlobalWindow
-            .apply(
-                "CreateTables",
-                new CreateTableDestinations<>(
-                    createDisposition, bqServices, dynamicDestinations, 
kmsKey))
-            .apply(
-                "Convert",
-                new StorageApiConvertMessages<>(
-                    dynamicDestinations,
-                    bqServices,
-                    failedRowsTag,
-                    successfulConvertedRowsTag,
-                    BigQueryStorageApiInsertErrorCoder.of(),
-                    successCoder));
+        inputInGlobalWindow.apply(
+            "Convert",
+            new StorageApiConvertMessages<>(
+                dynamicDestinations,
+                bqServices,
+                failedRowsTag,
+                successfulConvertedRowsTag,
+                BigQueryStorageApiInsertErrorCoder.of(),
+                successCoder));
 
     PCollectionTuple writeRecordsResult =
         convertMessagesResult
@@ -323,7 +310,9 @@ public class StorageApiLoads<DestinationT, ElementT>
                     BigQueryStorageApiInsertErrorCoder.of(),
                     TableRowJsonCoder.of(),
                     autoUpdateSchema,
-                    ignoreUnknownValues));
+                    ignoreUnknownValues,
+                    createDisposition,
+                    kmsKey));
 
     PCollection<BigQueryStorageApiInsertError> insertErrors =
         PCollectionList.of(convertMessagesResult.get(failedRowsTag))
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
index 7c6445fcf11..9d09a3da917 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
@@ -46,6 +46,8 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, 
ElementT>
   private final Coder<TableRow> successfulRowsCoder;
   private final boolean autoUpdateSchema;
   private final boolean ignoreUnknownValues;
+  private final BigQueryIO.Write.CreateDisposition createDisposition;
+  private final @Nullable String kmsKey;
 
   public StorageApiWriteRecordsInconsistent(
       StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
@@ -55,7 +57,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, 
ElementT>
       Coder<BigQueryStorageApiInsertError> failedRowsCoder,
       Coder<TableRow> successfulRowsCoder,
       boolean autoUpdateSchema,
-      boolean ignoreUnknownValues) {
+      boolean ignoreUnknownValues,
+      BigQueryIO.Write.CreateDisposition createDisposition,
+      @Nullable String kmsKey) {
     this.dynamicDestinations = dynamicDestinations;
     this.bqServices = bqServices;
     this.failedRowsTag = failedRowsTag;
@@ -64,6 +68,8 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, 
ElementT>
     this.successfulRowsTag = successfulRowsTag;
     this.autoUpdateSchema = autoUpdateSchema;
     this.ignoreUnknownValues = ignoreUnknownValues;
+    this.createDisposition = createDisposition;
+    this.kmsKey = kmsKey;
   }
 
   @Override
@@ -91,7 +97,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, 
ElementT>
                         failedRowsTag,
                         successfulRowsTag,
                         autoUpdateSchema,
-                        ignoreUnknownValues))
+                        ignoreUnknownValues,
+                        createDisposition,
+                        kmsKey))
                 .withOutputTags(finalizeTag, tupleTagList)
                 .withSideInputs(dynamicDestinations.getSideInputs()));
     result.get(failedRowsTag).setCoder(failedRowsCoder);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 46b542a84a5..e558d7a1fd1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -41,9 +41,11 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.coders.Coder;
@@ -102,6 +104,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
   private final boolean autoUpdateSchema;
   private final boolean ignoreUnknownValues;
   private static final ExecutorService closeWriterExecutor = 
Executors.newCachedThreadPool();
+  private final BigQueryIO.Write.CreateDisposition createDisposition;
+  private final @Nullable String kmsKey;
 
   /**
    * The Guava cache object is thread-safe. However our protocol requires that 
client pin the
@@ -156,7 +160,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       Coder<BigQueryStorageApiInsertError> failedRowsCoder,
       Coder<TableRow> successfulRowsCoder,
       boolean autoUpdateSchema,
-      boolean ignoreUnknownValues) {
+      boolean ignoreUnknownValues,
+      BigQueryIO.Write.CreateDisposition createDisposition,
+      @Nullable String kmsKey) {
     this.dynamicDestinations = dynamicDestinations;
     this.bqServices = bqServices;
     this.failedRowsTag = failedRowsTag;
@@ -165,6 +171,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
     this.successfulRowsCoder = successfulRowsCoder;
     this.autoUpdateSchema = autoUpdateSchema;
     this.ignoreUnknownValues = ignoreUnknownValues;
+    this.createDisposition = createDisposition;
+    this.kmsKey = kmsKey;
   }
 
   @Override
@@ -194,7 +202,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
                         failedRowsTag,
                         successfulRowsTag,
                         autoUpdateSchema,
-                        ignoreUnknownValues))
+                        ignoreUnknownValues,
+                        createDisposition,
+                        kmsKey))
                 .withOutputTags(finalizeTag, tupleTagList)
                 .withSideInputs(dynamicDestinations.getSideInputs()));
 
@@ -221,6 +231,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
     private final @Nullable TupleTag<TableRow> successfulRowsTag;
     private final boolean autoUpdateSchema;
     private final boolean ignoreUnknownValues;
+    private final BigQueryIO.Write.CreateDisposition createDisposition;
+    private final @Nullable String kmsKey;
 
     static class AppendRowsContext extends 
RetryManager.Operation.Context<AppendRowsResponse> {
       long offset;
@@ -256,6 +268,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           Metrics.counter(
               StorageApiWritesShardedRecords.WriteRecordsDoFn.class,
               "rowsSentToFailedRowsCollection");
+      private final Callable<Boolean> tryCreateTable;
 
       private final boolean useDefaultStream;
       private TableSchema initialTableSchema;
@@ -271,7 +284,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           boolean useDefaultStream,
           int streamAppendClientCount,
           boolean usingMultiplexing,
-          long maxRequestSize)
+          long maxRequestSize,
+          Callable<Boolean> tryCreateTable)
           throws Exception {
         this.tableUrn = tableUrn;
         this.pendingMessages = Lists.newArrayList();
@@ -282,6 +296,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
         this.clientNumber = new Random().nextInt(streamAppendClientCount);
         this.usingMultiplexing = usingMultiplexing;
         this.maxRequestSize = maxRequestSize;
+        this.tryCreateTable = tryCreateTable;
       }
 
       void teardown() {
@@ -306,64 +321,79 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         return this.streamName;
       }
 
-      String getOrCreateStreamName() {
-        if (Strings.isNullOrEmpty(this.streamName)) {
-          try {
-            if (!useDefaultStream) {
-              this.streamName =
-                  Preconditions.checkStateNotNull(maybeDatasetService)
-                      .createWriteStream(tableUrn, Type.PENDING)
-                      .getName();
-              this.currentOffset = 0;
-            } else {
-              this.streamName = getDefaultStreamName();
-            }
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
+      String getOrCreateStreamName() throws Exception {
+        CreateTableHelpers.createTableWrapper(
+            () -> {
+              if (!useDefaultStream) {
+                this.streamName =
+                    Preconditions.checkStateNotNull(maybeDatasetService)
+                        .createWriteStream(tableUrn, Type.PENDING)
+                        .getName();
+                this.currentOffset = 0;
+              } else {
+                this.streamName = getDefaultStreamName();
+              }
+              return null;
+            },
+            tryCreateTable);
         return this.streamName;
       }
 
       AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) 
throws Exception {
         TableSchema tableSchema =
             (updatedSchema != null) ? updatedSchema : 
getCurrentTableSchema(streamName);
-        AppendClientInfo appendClientInfo =
-            AppendClientInfo.of(
-                tableSchema,
-                // Make sure that the client is always closed in a different 
thread to avoid
-                // blocking.
-                client ->
-                    runAsyncIgnoreFailure(
-                        closeWriterExecutor,
-                        () -> {
-                          synchronized (APPEND_CLIENTS) {
-                            // Remove the pin owned by the cache.
-                            client.unpin();
-                            client.close();
-                          }
-                        }));
-        appendClientInfo =
-            appendClientInfo.withAppendClient(
-                Preconditions.checkStateNotNull(maybeDatasetService),
-                () -> streamName,
-                usingMultiplexing);
+        AtomicReference<AppendClientInfo> appendClientInfo =
+            new AtomicReference<>(
+                AppendClientInfo.of(
+                    tableSchema,
+                    // Make sure that the client is always closed in a 
different thread to avoid
+                    // blocking.
+                    client ->
+                        runAsyncIgnoreFailure(
+                            closeWriterExecutor,
+                            () -> {
+                              synchronized (APPEND_CLIENTS) {
+                                // Remove the pin owned by the cache.
+                                client.unpin();
+                                client.close();
+                              }
+                            })));
+
+        CreateTableHelpers.createTableWrapper(
+            () -> {
+              appendClientInfo.set(
+                  appendClientInfo
+                      .get()
+                      .withAppendClient(
+                          Preconditions.checkStateNotNull(maybeDatasetService),
+                          () -> streamName,
+                          usingMultiplexing));
+              
Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient());
+              return null;
+            },
+            tryCreateTable);
+
         // This pin is "owned" by the cache.
-        
Preconditions.checkStateNotNull(appendClientInfo.getStreamAppendClient()).pin();
-        return appendClientInfo;
+        
Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()).pin();
+        return appendClientInfo.get();
       }
 
-      TableSchema getCurrentTableSchema(String stream) {
-        TableSchema currentSchema = initialTableSchema;
-        if (autoUpdateSchema) {
-          @Nullable
-          WriteStream writeStream =
-              
Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName);
-          if (writeStream != null && writeStream.hasTableSchema()) {
-            currentSchema = writeStream.getTableSchema();
-          }
-        }
-        return currentSchema;
+      TableSchema getCurrentTableSchema(String stream) throws Exception {
+        AtomicReference<TableSchema> currentSchema = new 
AtomicReference<>(initialTableSchema);
+        CreateTableHelpers.createTableWrapper(
+            () -> {
+              if (autoUpdateSchema) {
+                @Nullable
+                WriteStream writeStream =
+                    
Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName);
+                if (writeStream != null && writeStream.hasTableSchema()) {
+                  currentSchema.set(writeStream.getTableSchema());
+                }
+              }
+              return null;
+            },
+            tryCreateTable);
+        return currentSchema.get();
       }
 
       AppendClientInfo getAppendClientInfo(
@@ -640,6 +670,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
                         + " failed with stream "
                         + "doesn't exist");
               }
+              // TODO: Only do this on explicit NOT_FOUND errors once BigQuery 
reliably produces
+              // them.
+              try {
+                tryCreateTable.call();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
 
               invalidateWriteStream();
 
@@ -731,7 +768,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
         TupleTag<BigQueryStorageApiInsertError> failedRowsTag,
         @Nullable TupleTag<TableRow> successfulRowsTag,
         boolean autoUpdateSchema,
-        boolean ignoreUnknownValues) {
+        boolean ignoreUnknownValues,
+        BigQueryIO.Write.CreateDisposition createDisposition,
+        @Nullable String kmsKey) {
       this.messageConverters = new 
TwoLevelMessageConverterCache<>(operationName);
       this.dynamicDestinations = dynamicDestinations;
       this.bqServices = bqServices;
@@ -744,6 +783,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       this.successfulRowsTag = successfulRowsTag;
       this.autoUpdateSchema = autoUpdateSchema;
       this.ignoreUnknownValues = ignoreUnknownValues;
+      this.createDisposition = createDisposition;
+      this.kmsKey = kmsKey;
     }
 
     boolean shouldFlush() {
@@ -823,6 +864,20 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
               + "but %s returned null for destination %s",
           dynamicDestinations,
           destination);
+      @Nullable Coder<DestinationT> destinationCoder = 
dynamicDestinations.getDestinationCoder();
+      Callable<Boolean> tryCreateTable =
+          () -> {
+            CreateTableHelpers.possiblyCreateTable(
+                c.getPipelineOptions().as(BigQueryOptions.class),
+                tableDestination1,
+                () -> dynamicDestinations.getSchema(destination),
+                createDisposition,
+                destinationCoder,
+                kmsKey,
+                bqServices);
+            return true;
+          };
+
       MessageConverter<ElementT> messageConverter;
       try {
         messageConverter = messageConverters.get(destination, 
dynamicDestinations, datasetService);
@@ -833,7 +888,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
             useDefaultStream,
             streamAppendClientCount,
             bigQueryOptions.getUseStorageApiConnectionPool(),
-            bigQueryOptions.getStorageWriteApiMaxRequestSize());
+            bigQueryOptions.getStorageWriteApiMaxRequestSize(),
+            tryCreateTable);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -850,6 +906,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       DatasetService initializedDatasetService = 
initializeDatasetService(pipelineOptions);
       dynamicDestinations.setSideInputAccessorFromProcessContext(c);
       Preconditions.checkStateNotNull(destinations);
+
       DestinationState state =
           destinations.computeIfAbsent(
               element.getKey(),
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index e0353bf9a90..c546d4f2ede 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -353,20 +353,30 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
         ValueState<String> streamName,
         ValueState<Long> streamOffset,
         Timer streamIdleTimer,
-        DatasetService datasetService) {
+        DatasetService datasetService,
+        Callable<Boolean> tryCreateTable) {
       try {
-        String stream = streamName.read();
-        if (stream == null || "".equals(stream)) {
+        final @Nullable String streamValue = streamName.read();
+        AtomicReference<String> stream = new AtomicReference<>();
+        if (streamValue == null || "".equals(streamValue)) {
           // In a buffered stream, data is only visible up to the offset to 
which it was flushed.
-          stream = datasetService.createWriteStream(tableId, 
Type.BUFFERED).getName();
-          streamName.write(stream);
+          CreateTableHelpers.createTableWrapper(
+              () -> {
+                stream.set(datasetService.createWriteStream(tableId, 
Type.BUFFERED).getName());
+                return null;
+              },
+              tryCreateTable);
+
+          streamName.write(stream.get());
           streamOffset.write(0L);
           streamsCreated.inc();
+        } else {
+          stream.set(streamValue);
         }
         // Reset the idle timer.
         
streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
 
-        return stream;
+        return stream.get();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -427,8 +437,24 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
       final String tableId = tableDestination.getTableUrn(bigQueryOptions);
       final DatasetService datasetService = getDatasetService(pipelineOptions);
 
+      Coder<DestinationT> destinationCoder = 
dynamicDestinations.getDestinationCoder();
+      Callable<Boolean> tryCreateTable =
+          () -> {
+            CreateTableHelpers.possiblyCreateTable(
+                c.getPipelineOptions().as(BigQueryOptions.class),
+                tableDestination,
+                () -> dynamicDestinations.getSchema(element.getKey().getKey()),
+                createDisposition,
+                destinationCoder,
+                kmsKey,
+                bqServices);
+            return true;
+          };
+
       Supplier<String> getOrCreateStream =
-          () -> getOrCreateStream(tableId, streamName, streamOffset, 
idleTimer, datasetService);
+          () ->
+              getOrCreateStream(
+                  tableId, streamName, streamOffset, idleTimer, 
datasetService, tryCreateTable);
       Callable<AppendClientInfo> getAppendClientInfo =
           () -> {
             @Nullable TableSchema tableSchema;
@@ -626,6 +652,13 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
             // Invalidate the StreamWriter and force a new one to be created.
             LOG.error(
                 "Got error " + failedContext.getError() + " closing " + 
failedContext.streamName);
+
+            // TODO: Only do this on explicit NOT_FOUND errors once BigQuery 
reliably produces them.
+            try {
+              tryCreateTable.call();
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
             clearClients.accept(failedContexts);
             appendFailures.inc();
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 266de15d4be..6828c764a36 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -23,6 +23,8 @@ import 
com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.http.HttpHeaders;
 import com.google.api.core.ApiFuture;
 import com.google.api.core.ApiFutures;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.Table;
@@ -45,6 +47,7 @@ import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.Timestamp;
 import com.google.rpc.Code;
+import io.grpc.Status;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
@@ -509,20 +512,24 @@ public class FakeDatasetService implements 
DatasetService, Serializable {
   }
 
   @Override
-  public WriteStream createWriteStream(String tableUrn, Type type)
-      throws IOException, InterruptedException {
-    TableReference tableReference =
-        
BigQueryHelpers.parseTableUrn(BigQueryHelpers.stripPartitionDecorator(tableUrn));
-    synchronized (FakeDatasetService.class) {
-      TableContainer tableContainer =
-          getTableContainer(
-              tableReference.getProjectId(),
-              tableReference.getDatasetId(),
-              tableReference.getTableId());
-      String streamName = UUID.randomUUID().toString();
-      Stream stream = new Stream(streamName, tableContainer, type);
-      writeStreams.put(streamName, stream);
-      return stream.toWriteStream();
+  public WriteStream createWriteStream(String tableUrn, Type type) throws 
InterruptedException {
+    try {
+      TableReference tableReference =
+          
BigQueryHelpers.parseTableUrn(BigQueryHelpers.stripPartitionDecorator(tableUrn));
+      synchronized (FakeDatasetService.class) {
+        TableContainer tableContainer =
+            getTableContainer(
+                tableReference.getProjectId(),
+                tableReference.getDatasetId(),
+                tableReference.getTableId());
+        String streamName = UUID.randomUUID().toString();
+        Stream stream = new Stream(streamName, tableContainer, type);
+        writeStreams.put(streamName, stream);
+        return stream.toWriteStream();
+      }
+    } catch (IOException e) {
+      // TODO(relax): Return the exact error that BigQuery returns.
+      throw new ApiException(e, GrpcStatusCode.of(Status.Code.NOT_FOUND), 
false);
     }
   }
 
@@ -535,7 +542,8 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
         return stream.toWriteStream();
       }
     }
-    return null;
+    // TODO(relax): Return the exact error that BigQuery returns.
+    throw new ApiException(null, GrpcStatusCode.of(Status.Code.NOT_FOUND), 
false);
   }
 
   @Override
@@ -550,6 +558,10 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
         this.protoDescriptor = descriptor;
         synchronized (FakeDatasetService.class) {
           Stream stream = writeStreams.get(streamName);
+          if (stream == null) {
+            // TODO(relax): Return the exact error that BigQuery returns.
+            throw new ApiException(null, 
GrpcStatusCode.of(Status.Code.NOT_FOUND), false);
+          }
           currentSchema = stream.tableContainer.getTable().getSchema();
         }
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java
new file mode 100644
index 00000000000..3cb44c6c7b6
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Integration test for table create-if-needed handling when using the 
storage API. */
+@RunWith(Parameterized.class)
+public class StorageApiSinkCreateIfNeededIT {
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return ImmutableList.of(new Object[] {true}, new Object[] {false});
+  }
+
+  @Parameterized.Parameter(0)
+  public boolean useAtLeastOnce;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageApiSinkCreateIfNeededIT.class);
+
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("StorageApiSinkFailedRowsIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "storage_api_sink_failed_rows" + System.nanoTime();
+  private static final List<TableFieldSchema> FIELDS =
+      ImmutableList.<TableFieldSchema>builder()
+          .add(new TableFieldSchema().setType("STRING").setName("str"))
+          .add(new TableFieldSchema().setType("INT64").setName("tablenum"))
+          .build();
+  private static final TableSchema BASE_TABLE_SCHEMA = new 
TableSchema().setFields(FIELDS);
+
+  private BigQueryIO.Write.Method getMethod() {
+    return useAtLeastOnce
+        ? BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
+        : BigQueryIO.Write.Method.STORAGE_WRITE_API;
+  }
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws IOException, 
InterruptedException {
+    // Create one BQ dataset for all test cases.
+    BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    LOG.info("Start to clean up tables and datasets.");
+    BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @Test
+  public void testCreateManyTables() throws IOException, InterruptedException {
+    List<TableRow> inputs =
+        LongStream.range(0, 100)
+            .mapToObj(l -> new TableRow().set("str", "foo").set("tablenum", l))
+            .collect(Collectors.toList());
+
+    String table = "table" + System.nanoTime();
+    String tableSpecBase = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table;
+    runPipeline(getMethod(), tableSpecBase, inputs);
+    assertTablesCreated(tableSpecBase, 100);
+  }
+
+  private void assertTablesCreated(String tableSpecPrefix, int expectedRows)
+      throws IOException, InterruptedException {
+    TableRow queryResponse =
+        Iterables.getOnlyElement(
+            BQ_CLIENT.queryUnflattened(
+                String.format("SELECT COUNT(*) FROM `%s`", tableSpecPrefix + 
"*"),
+                PROJECT,
+                true,
+                true));
+    int numRowsWritten = Integer.parseInt((String) queryResponse.get("f0_"));
+    if (useAtLeastOnce) {
+      assertThat(numRowsWritten, Matchers.greaterThanOrEqualTo(expectedRows));
+    } else {
+      assertThat(numRowsWritten, Matchers.equalTo(expectedRows));
+    }
+  }
+
+  private static void runPipeline(
+      BigQueryIO.Write.Method method, String tableSpecBase, Iterable<TableRow> 
tableRows) {
+    Pipeline p = Pipeline.create();
+
+    BigQueryIO.Write<TableRow> write =
+        BigQueryIO.writeTableRows()
+            .to(tr -> new TableDestination(tableSpecBase + 
tr.getValue().get("tablenum"), ""))
+            .withSchema(BASE_TABLE_SCHEMA)
+            .withMethod(method)
+            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED);
+    if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) {
+      write = write.withNumStorageWriteApiStreams(1);
+      write = write.withTriggeringFrequency(Duration.standardSeconds(1));
+    }
+    PCollection<TableRow> input = p.apply("Create test cases", 
Create.of(tableRows));
+    input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply("Write using Storage Write API", write);
+
+    p.run().waitUntilFinish();
+  }
+}


Reply via email to