This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3838528fde3 [#21368] Clean-up and use the FixedExecutorProvider (#24952) 3838528fde3 is described below commit 3838528fde37262ecadf642791e3e0f3a57d7b25 Author: Luke Cwik <lc...@google.com> AuthorDate: Mon Jan 9 15:29:46 2023 -0800 [#21368] Clean-up and use the FixedExecutorProvider (#24952) This is a minor clean-up for https://github.com/apache/beam/pull/24950 to re-use existing implementation from gax-java --- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 30 +++------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index f56239aedd6..289087c1d46 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -29,8 +29,8 @@ import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; -import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; @@ -106,7 +106,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1494,36 +1493,15 @@ class BigQueryServicesImpl implements BigQueryServices { return BigQueryWriteClient.create( BigQueryWriteSettings.newBuilder() .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) - .setBackgroundExecutorProvider(new OptionsExecutionProvider(options)) + .setBackgroundExecutorProvider( + FixedExecutorProvider.create( + options.as(ExecutorOptions.class).getScheduledExecutorService())) .build()); } catch (Exception e) { throw new RuntimeException(e); } } - /** - * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link - * ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient}. - */ - private static class OptionsExecutionProvider implements ExecutorProvider { - - private final BigQueryOptions options; - - public OptionsExecutionProvider(BigQueryOptions options) { - this.options = options; - } - - @Override - public boolean shouldAutoClose() { - return false; - } - - @Override - public ScheduledExecutorService getExecutor() { - return options.as(ExecutorOptions.class).getScheduledExecutorService(); - } - } - public static CustomHttpErrors createBigQueryClientCustomErrors() { CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder(); // 403 errors, to list tables, matching this URL: