This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3838528fde3 [#21368] Clean-up and use the FixedExecutorProvider 
(#24952)
3838528fde3 is described below

commit 3838528fde37262ecadf642791e3e0f3a57d7b25
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Jan 9 15:29:46 2023 -0800

    [#21368] Clean-up and use the FixedExecutorProvider (#24952)
    
    This is a minor clean-up for https://github.com/apache/beam/pull/24950 to 
re-use existing implementation from gax-java
---
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 30 +++-------------------
 1 file changed, 4 insertions(+), 26 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index f56239aedd6..289087c1d46 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -29,8 +29,8 @@ import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.ExponentialBackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.core.ApiFuture;
-import com.google.api.gax.core.ExecutorProvider;
 import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.FixedHeaderProvider;
 import com.google.api.gax.rpc.HeaderProvider;
@@ -106,7 +106,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1494,36 +1493,15 @@ class BigQueryServicesImpl implements BigQueryServices {
       return BigQueryWriteClient.create(
           BigQueryWriteSettings.newBuilder()
               .setCredentialsProvider(() -> 
options.as(GcpOptions.class).getGcpCredential())
-              .setBackgroundExecutorProvider(new 
OptionsExecutionProvider(options))
+              .setBackgroundExecutorProvider(
+                  FixedExecutorProvider.create(
+                      
options.as(ExecutorOptions.class).getScheduledExecutorService()))
               .build());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  /**
-   * OptionsExecutionProvider is a utility class used to wrap the 
Pipeline-wide {@link
-   * ScheduledExecutorService} into a supplier for the {@link 
BigQueryWriteClient}.
-   */
-  private static class OptionsExecutionProvider implements ExecutorProvider {
-
-    private final BigQueryOptions options;
-
-    public OptionsExecutionProvider(BigQueryOptions options) {
-      this.options = options;
-    }
-
-    @Override
-    public boolean shouldAutoClose() {
-      return false;
-    }
-
-    @Override
-    public ScheduledExecutorService getExecutor() {
-      return options.as(ExecutorOptions.class).getScheduledExecutorService();
-    }
-  }
-
   public static CustomHttpErrors createBigQueryClientCustomErrors() {
     CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
     // 403 errors, to list tables, matching this URL:

Reply via email to