This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ba12e9af4f6af3bff1ae3298e9aaaf1edbdff744 Author: Gary Yao <g...@apache.org> AuthorDate: Wed Jul 3 20:19:16 2019 +0200 [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor --- .../ComponentMainThreadExecutorServiceAdapter.java | 60 ++++++++++++++++++---- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java index dbd94ab..c71675a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java @@ -18,31 +18,73 @@ package org.apache.flink.runtime.concurrent; -import javax.annotation.Nonnull; +import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Adapter class for a {@link ScheduledExecutorService} which shall be used as a + * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a * {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the * main thread of the executor. */ -public class ComponentMainThreadExecutorServiceAdapter - extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor { +public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainThreadExecutor { + + private final ScheduledExecutor scheduledExecutor; /** A runnable that should assert that the current thread is the expected main thread. */ - @Nonnull private final Runnable mainThreadCheck; public ComponentMainThreadExecutorServiceAdapter( - @Nonnull ScheduledExecutorService scheduledExecutorService, - @Nonnull Runnable mainThreadCheck) { - super(scheduledExecutorService); - this.mainThreadCheck = mainThreadCheck; + final ScheduledExecutorService scheduledExecutorService, + final Runnable mainThreadCheck) { + this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck); + } + + public ComponentMainThreadExecutorServiceAdapter( + final ScheduledExecutor scheduledExecutorService, + final Thread mainThread) { + this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread)); + } + + private ComponentMainThreadExecutorServiceAdapter( + final ScheduledExecutor scheduledExecutor, + final Runnable mainThreadCheck) { + this.scheduledExecutor = checkNotNull(scheduledExecutor); + this.mainThreadCheck = checkNotNull(mainThreadCheck); } @Override public void assertRunningInMainThread() { mainThreadCheck.run(); } + + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + return scheduledExecutor.schedule(command, delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { + return scheduledExecutor.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + return scheduledExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void execute(final Runnable command) { + scheduledExecutor.execute(command); + } }