This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba12e9af4f6af3bff1ae3298e9aaaf1edbdff744
Author: Gary Yao <g...@apache.org>
AuthorDate: Wed Jul 3 20:19:16 2019 +0200

    [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept 
ScheduledExecutor
---
 .../ComponentMainThreadExecutorServiceAdapter.java | 60 ++++++++++++++++++----
 1 file changed, 51 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index dbd94ab..c71675a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -18,31 +18,73 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import javax.annotation.Nonnull;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Adapter class for a {@link ScheduledExecutorService} which shall be used as 
a
+ * Adapter class for a {@link ScheduledExecutorService} or {@link 
ScheduledExecutor} which shall be used as a
  * {@link ComponentMainThreadExecutor}. It enhances the given executor with an 
assert that the current thread is the
  * main thread of the executor.
  */
-public class ComponentMainThreadExecutorServiceAdapter
-       extends ScheduledExecutorServiceAdapter implements 
ComponentMainThreadExecutor {
+public class ComponentMainThreadExecutorServiceAdapter implements 
ComponentMainThreadExecutor {
+
+       private final ScheduledExecutor scheduledExecutor;
 
        /** A runnable that should assert that the current thread is the 
expected main thread. */
-       @Nonnull
        private final Runnable mainThreadCheck;
 
        public ComponentMainThreadExecutorServiceAdapter(
-               @Nonnull ScheduledExecutorService scheduledExecutorService,
-               @Nonnull Runnable mainThreadCheck) {
-               super(scheduledExecutorService);
-               this.mainThreadCheck = mainThreadCheck;
+                       final ScheduledExecutorService scheduledExecutorService,
+                       final Runnable mainThreadCheck) {
+               this(new 
ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
+       }
+
+       public ComponentMainThreadExecutorServiceAdapter(
+                       final ScheduledExecutor scheduledExecutorService,
+                       final Thread mainThread) {
+               this(scheduledExecutorService, () -> 
MainThreadValidatorUtil.isRunningInExpectedThread(mainThread));
+       }
+
+       private ComponentMainThreadExecutorServiceAdapter(
+                       final ScheduledExecutor scheduledExecutor,
+                       final Runnable mainThreadCheck) {
+               this.scheduledExecutor = checkNotNull(scheduledExecutor);
+               this.mainThreadCheck = checkNotNull(mainThreadCheck);
        }
 
        @Override
        public void assertRunningInMainThread() {
                mainThreadCheck.run();
        }
+
+       @Override
+       public ScheduledFuture<?> schedule(final Runnable command, final long 
delay, final TimeUnit unit) {
+               return scheduledExecutor.schedule(command, delay, unit);
+       }
+
+       @Override
+       public <V> ScheduledFuture<V> schedule(final Callable<V> callable, 
final long delay, final TimeUnit unit) {
+               return scheduledExecutor.schedule(callable, delay, unit);
+       }
+
+       @Override
+       public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, 
final long initialDelay, final long period, final TimeUnit unit) {
+               return scheduledExecutor.scheduleAtFixedRate(command, 
initialDelay, period, unit);
+       }
+
+       @Override
+       public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable 
command, final long initialDelay, final long delay, final TimeUnit unit) {
+               return scheduledExecutor.scheduleWithFixedDelay(command, 
initialDelay, delay, unit);
+       }
+
+       @Override
+       public void execute(final Runnable command) {
+               scheduledExecutor.execute(command);
+       }
 }

Reply via email to