This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 227a846dc2d Merge pull request #25804: Reuse client when constructing StreamWriter 227a846dc2d is described below commit 227a846dc2dd0b78129739d0501d3dae31b17484 Author: Reuven Lax <re...@google.com> AuthorDate: Sat Mar 11 09:55:13 2023 -0800 Merge pull request #25804: Reuse client when constructing StreamWriter --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 3687a163d76..4e2e7aec0ec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1338,7 +1338,7 @@ class BigQueryServicesImpl implements BigQueryServices { .build(); StreamWriter streamWriter = - StreamWriter.newBuilder(streamName) + StreamWriter.newBuilder(streamName, newWriteClient) .setExecutorProvider( FixedExecutorProvider.create( options.as(ExecutorOptions.class).getScheduledExecutorService())) @@ -1514,9 +1514,18 @@ class BigQueryServicesImpl implements BigQueryServices { private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions options) { try { + TransportChannelProvider transportChannelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .setChannelsPerCpu(2) + .build(); + return BigQueryWriteClient.create( BigQueryWriteSettings.newBuilder() .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) + .setTransportChannelProvider(transportChannelProvider) .setBackgroundExecutorProvider( FixedExecutorProvider.create( options.as(ExecutorOptions.class).getScheduledExecutorService()))