This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b24aa0302074a5eaf6c1b47c2764e9883b674ab Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Fri Jun 21 18:07:03 2019 +0200 [FLINK-12804] Introduce mailbox-based ExecutorService --- .../flink/state/api/output/BoundedStreamTask.java | 3 +- .../flink/runtime/concurrent/FutureUtils.java | 26 ++ .../streaming/runtime/tasks/SourceStreamTask.java | 66 ++--- .../runtime/tasks/StreamIterationHead.java | 3 +- .../flink/streaming/runtime/tasks/StreamTask.java | 94 ++----- .../streaming/runtime/tasks/mailbox/Mailbox.java | 78 +++++- .../runtime/tasks/mailbox/MailboxImpl.java | 195 ++++++++++++-- .../runtime/tasks/mailbox/MailboxReceiver.java | 6 +- .../runtime/tasks/mailbox/MailboxSender.java | 6 +- .../{Mailbox.java => MailboxStateException.java} | 29 +- .../DefaultActionContext.java} | 24 +- .../MailboxDefaultAction.java} | 19 +- .../tasks/mailbox/execution/MailboxExecutor.java | 86 ++++++ .../MailboxExecutorService.java} | 19 +- .../execution/MailboxExecutorServiceImpl.java | 157 +++++++++++ .../tasks/mailbox/execution/MailboxProcessor.java | 295 +++++++++++++++++++++ .../SuspendedMailboxDefaultAction.java} | 16 +- .../tasks/StreamTaskSelectiveReadingTest.java | 3 +- .../runtime/tasks/StreamTaskTerminationTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java | 13 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 5 +- .../runtime/tasks/SynchronousCheckpointTest.java | 3 +- .../tasks/TaskCheckpointingBehaviourTest.java | 3 +- .../runtime/tasks/mailbox/MailboxImplTest.java | 185 ++++++++++++- .../execution/MailboxExecutorServiceImplTest.java | 178 +++++++++++++ .../mailbox/execution/MailboxProcessorTest.java | 265 ++++++++++++++++++ .../mailbox/execution/TestMailboxExecutor.java | 70 +++++ .../flink/streaming/util/MockStreamTask.java | 3 +- .../jobmaster/JobMasterStopWithSavepointIT.java | 5 +- 29 files changed, 1636 insertions(+), 222 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java index db663da..2ce9626 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; @@ -76,7 +77,7 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { if (input.hasNext()) { reuse.replace(input.next()); headOperator.setKeyContextElement1(reuse); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index c1613c5..1db1d2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -28,9 +28,12 @@ import org.apache.flink.util.function.SupplierWithException; import akka.dispatch.OnComplete; +import javax.annotation.Nonnull; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -995,4 +998,27 @@ public class FutureUtils { } }); } + + /** + * Cancels all instances of {@link java.util.concurrent.Future} in the given list of runnables without interrupting. + * This method will suppress unexpected exceptions until the whole list is processed and then rethrow. + * + * @param runnables list of {@link Runnable} candidates to cancel. + */ + public static void cancelRunnableFutures(@Nonnull List<Runnable> runnables) { + RuntimeException suppressedExceptions = null; + for (Runnable runnable : runnables) { + if (runnable instanceof java.util.concurrent.Future) { + try { + ((java.util.concurrent.Future<?>) runnable).cancel(false); + } catch (RuntimeException ex) { + // safety net to ensure all candidates get cancelled before we let the exception bubble up. + suppressedExceptions = ExceptionUtils.firstOrSuppressed(ex, suppressedExceptions); + } + } + } + if (suppressedExceptions != null) { + throw suppressedExceptions; + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 73f6bc4..50fdca1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -25,8 +25,11 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.FlinkException; +import java.util.concurrent.CompletableFuture; + /** * {@link StreamTask} for executing a {@link StreamSource}. * @@ -45,8 +48,6 @@ import org.apache.flink.util.FlinkException; public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { - private static final Runnable SOURCE_POISON_LETTER = () -> {}; - private volatile boolean externallyInducedCheckpoints; public SourceStreamTask(Environment env) { @@ -100,42 +101,21 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { + + context.suspendDefaultAction(); + // Against the usual contract of this method, this implementation is not step-wise but blocking instead for // compatibility reasons with the current source interface (source functions run as a loop, not in steps). - final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread(); + final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread(getName()); sourceThread.start(); - - // We run an alternative mailbox loop that does not involve default actions and synchronizes around actions. - try { - runAlternativeMailboxLoop(); - } catch (Exception mailboxEx) { - // We cancel the source function if some runtime exception escaped the mailbox. - if (!isCanceled()) { - cancelTask(); - } - throw mailboxEx; - } - - sourceThread.join(); - sourceThread.checkThrowSourceExecutionException(); - - context.allActionsCompleted(); - } - - private void runAlternativeMailboxLoop() throws InterruptedException { - - while (true) { - - Runnable letter = mailbox.takeMail(); - if (letter == SOURCE_POISON_LETTER) { - break; + sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> { + if (sourceThreadThrowable == null) { + mailboxProcessor.allActionsCompleted(); + } else { + mailboxProcessor.reportThrowable(sourceThreadThrowable); } - - synchronized (getCheckpointLock()) { - letter.run(); - } - } + }); } @Override @@ -172,27 +152,25 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S */ private class LegacySourceFunctionThread extends Thread { - private Throwable sourceExecutionThrowable; + private final CompletableFuture<Void> completionFuture; - LegacySourceFunctionThread() { - this.sourceExecutionThrowable = null; + LegacySourceFunctionThread(String taskDescription) { + super("Legacy Source Thread - " + taskDescription); + this.completionFuture = new CompletableFuture<>(); } @Override public void run() { try { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain); + completionFuture.complete(null); } catch (Throwable t) { - sourceExecutionThrowable = t; - } finally { - mailbox.clearAndPut(SOURCE_POISON_LETTER); + completionFuture.completeExceptionally(t); } } - void checkThrowSourceExecutionException() throws Exception { - if (sourceExecutionThrowable != null) { - throw new Exception(sourceExecutionThrowable); - } + CompletableFuture<Void> getCompletionFuture() { + return completionFuture; } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index d25bd23..4a031af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; @@ -66,7 +67,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> { // ------------------------------------------------------------------------ @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { StreamRecord<OUT> nextRecord = shouldWait ? dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : dataChannel.take(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8ba2a44..a626bba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -58,11 +58,11 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox; -import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +74,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -129,13 +128,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers"); /** The logger used by the StreamTask and its subclasses. */ - private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); - - /** Special value, letter that terminates the mailbox loop. */ - private static final Runnable POISON_LETTER = () -> {}; - - /** Special value, letter that "wakes up" a waiting mailbox loop. */ - private static final Runnable DEFAULT_ACTION_AVAILABLE = () -> {}; + protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); // ------------------------------------------------------------------------ @@ -200,7 +193,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final SynchronousSavepointLatch syncSavepointLatch; - protected final Mailbox mailbox; + protected final MailboxProcessor mailboxProcessor; // ------------------------------------------------------------------------ @@ -247,7 +240,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); this.recordWriters = createRecordWriters(configuration, environment); this.syncSavepointLatch = new SynchronousSavepointLatch(); - this.mailbox = new MailboxImpl(); + this.mailboxProcessor = new MailboxProcessor(this::performDefaultAction); } // ------------------------------------------------------------------------ @@ -272,34 +265,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> * @param context context object for collaborative interaction between the action and the stream task. * @throws Exception on any problems in the action. */ - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { if (!inputProcessor.processInput()) { context.allActionsCompleted(); } } /** - * Runs the stream-tasks main processing loop. - */ - private void run() throws Exception { - final ActionContext actionContext = new ActionContext(); - while (true) { - if (mailbox.hasMail()) { - Optional<Runnable> maybeLetter; - while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) { - Runnable letter = maybeLetter.get(); - if (letter == POISON_LETTER) { - return; - } - letter.run(); - } - } - - performDefaultAction(actionContext); - } - } - - /** * Emits the {@link org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK MAX_WATERMARK} * so that all registered timers are fired. * @@ -381,6 +353,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // -------- Invoke -------- LOG.debug("Invoking {}", getName()); + // open mailbox + mailboxProcessor.open(); + // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { @@ -400,7 +375,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // let the task do its work isRunning = true; - run(); + mailboxProcessor.runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted @@ -422,6 +397,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // make sure no new timers can come timerService.quiesce(); + // let mailbox execution reject all new letters from this point + mailboxProcessor.prepareClose(); + // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; @@ -489,12 +467,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> operatorChain.releaseOutputs(); } } + + mailboxProcessor.close(); } } @Override public final void cancel() throws Exception { - mailbox.clearAndPut(POISON_LETTER); isRunning = false; canceled = true; @@ -505,10 +484,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> cancelTask(); } finally { + mailboxProcessor.cancelMailboxExecution(); cancelables.close(); } } + public MailboxExecutor getTaskMailboxExecutor() { + return mailboxProcessor.getMailboxExecutor(); + } + public final boolean isRunning() { return isRunning; } @@ -1332,40 +1316,4 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup()); return output; } - - /** - * The action context is passed as parameter into the default action method and holds control methods for feedback - * of from the default action to the mailbox. - */ - public final class ActionContext { - - private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(() -> mailbox.takeMail().run()); - - /** - * This method must be called to end the stream task when all actions for the tasks have been performed. - */ - public void allActionsCompleted() { - mailbox.clearAndPut(POISON_LETTER); - } - - /** - * Calling this method signals that the mailbox-thread should continue invoking the default action, e.g. because - * new input became available for processing. - * - * @throws InterruptedException on interruption. - */ - public void actionsAvailable() throws InterruptedException { - mailbox.putMail(DEFAULT_ACTION_AVAILABLE); - } - - /** - * Calling this method signals that the mailbox-thread should (temporarily) stop invoking the default action, - * e.g. because there is currently no input available. - * - * @throws InterruptedException on interruption. - */ - public void actionsUnavailable() throws InterruptedException { - mailbox.putMail(actionUnavailableLetter); - } - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java index dfa8d76..9b2d95f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java @@ -20,17 +20,85 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; import javax.annotation.Nonnull; +import java.util.List; + /** * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between - * multiple producer threads and a single consumer. + * multiple producer threads and a single consumer. This has a lifecycle of closed -> open -> (quiesced) -> closed. */ public interface Mailbox extends MailboxReceiver, MailboxSender { /** - * The effect of this is that all pending letters are dropped and the given priorityAction - * is enqueued to the head of the mailbox. + * This enum represents the states of the mailbox lifecycle. + */ + enum State { + OPEN, QUIESCED, CLOSED + } + + /** + * Open the mailbox. In this state, the mailbox supports put and take operations. + */ + void open(); + + /** + * Quiesce the mailbox. In this state, the mailbox supports only take operations and all pending and future put + * operations will throw {@link MailboxStateException}. + */ + void quiesce(); + + /** + * Close the mailbox. In this state, all pending and future put operations and all pending and future take + * operations will throw {@link MailboxStateException}. Returns all letters that were still enqueued. + * + * @return list with all letters that where enqueued in the mailbox at the time of closing. + */ + @Nonnull + List<Runnable> close(); + + /** + * The effect of this is that all pending letters in the mailbox are dropped and the given priorityLetter + * is enqueued to the head of the mailbox. Dropped letters are returned. This method should only be invoked + * by code that has ownership of the mailbox object and only rarely used, e.g. to submit special events like + * shutting down the mailbox loop. + * + * @param priorityLetter action to enqueue atomically after the mailbox was cleared. + * @throws MailboxStateException if the mailbox is quiesced or closed. + */ + @Nonnull + List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException; + + /** + * Adds the given action to the head of the mailbox. This method will block if the mailbox is full and + * should therefore only be called from outside the mailbox main-thread to avoid deadlocks. + * + * @param priorityLetter action to enqueue to the head of the mailbox. + * @throws InterruptedException on interruption. + * @throws MailboxStateException if the mailbox is quiesced or closed. + */ + void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException; + + /** + * Adds the given action to the head of the mailbox if the mailbox is not full. Returns true if the letter + * was successfully added to the mailbox. + * + * @param priorityLetter action to enqueue to the head of the mailbox. + * @return true if the letter was successfully added. + * @throws MailboxStateException if the mailbox is quiesced or closed. + */ + boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException; + + /** + * Returns the current state of the mailbox as defined by the lifecycle enum {@link State}. + * + * @return the current state of the mailbox. + */ + @Nonnull + State getState(); + + /** + * Returns the total capacity of the mailbox. * - * @param priorityAction action to enqueue atomically after the mailbox was cleared. + * @return the total capacity of the mailbox. */ - void clearAndPut(@Nonnull Runnable priorityAction); + int capacity(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java index e9bd346..0f6bbf0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java @@ -24,6 +24,8 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -77,6 +79,12 @@ public class MailboxImpl implements Mailbox { private volatile int count; /** + * The state of the mailbox in the lifecycle of open, quiesced, and closed. + */ + @GuardedBy("lock") + private volatile State state; + + /** * A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops. */ private final int moduloMask; @@ -93,6 +101,7 @@ public class MailboxImpl implements Mailbox { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); + this.state = State.CLOSED; } @Override @@ -101,11 +110,16 @@ public class MailboxImpl implements Mailbox { } @Override - public Optional<Runnable> tryTakeMail() { + public Optional<Runnable> tryTakeMail() throws MailboxStateException { final ReentrantLock lock = this.lock; lock.lock(); try { - return isEmpty() ? Optional.empty() : Optional.of(takeInternal()); + if (isEmpty()) { + checkTakeStateConditions(); + return Optional.empty(); + } else { + return Optional.of(takeInternal()); + } } finally { lock.unlock(); } @@ -113,11 +127,12 @@ public class MailboxImpl implements Mailbox { @Nonnull @Override - public Runnable takeMail() throws InterruptedException { + public Runnable takeMail() throws InterruptedException, MailboxStateException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (isEmpty()) { + checkTakeStateConditions(); notEmpty.await(); } return takeInternal(); @@ -129,14 +144,15 @@ public class MailboxImpl implements Mailbox { //------------------------------------------------------------------------------------------------------------------ @Override - public boolean tryPutMail(@Nonnull Runnable letter) { + public boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException { final ReentrantLock lock = this.lock; lock.lock(); try { if (isFull()) { + checkPutStateConditions(); return false; } else { - putInternal(letter); + putTailInternal(letter); return true; } } finally { @@ -145,14 +161,15 @@ public class MailboxImpl implements Mailbox { } @Override - public void putMail(@Nonnull Runnable letter) throws InterruptedException { + public void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (isFull()) { + checkPutStateConditions(); notFull.await(); } - putInternal(letter); + putTailInternal(letter); } finally { lock.unlock(); } @@ -160,16 +177,79 @@ public class MailboxImpl implements Mailbox { //------------------------------------------------------------------------------------------------------------------ - private void putInternal(Runnable letter) { + @Nonnull + @Override + public List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException { + ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity()); + + lock.lock(); + try { + // check state first to avoid loosing any letters forever through exception + checkPutStateConditions(); + dropAllLetters(droppedLetters); + putTailInternal(priorityLetter); + } finally { + lock.unlock(); + } + + return droppedLetters; + } + + @Override + public void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (isFull()) { + checkPutStateConditions(); + notFull.await(); + } + putHeadInternal(priorityLetter); + } finally { + lock.unlock(); + } + } + + @Override + public boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (isFull()) { + checkPutStateConditions(); + return false; + } else { + putHeadInternal(priorityLetter); + return true; + } + } finally { + lock.unlock(); + } + } + + //------------------------------------------------------------------------------------------------------------------ + + private void putHeadInternal(Runnable letter) throws MailboxStateException { assert lock.isHeldByCurrentThread(); + checkPutStateConditions(); + headIndex = decreaseIndexWithWrapAround(headIndex); + this.ringBuffer[headIndex] = letter; + ++count; + notEmpty.signal(); + } + + private void putTailInternal(Runnable letter) throws MailboxStateException { + assert lock.isHeldByCurrentThread(); + checkPutStateConditions(); this.ringBuffer[tailIndex] = letter; tailIndex = increaseIndexWithWrapAround(tailIndex); ++count; notEmpty.signal(); } - private Runnable takeInternal() { + private Runnable takeInternal() throws MailboxStateException { assert lock.isHeldByCurrentThread(); + checkTakeStateConditions(); final Runnable[] buffer = this.ringBuffer; Runnable letter = buffer[headIndex]; buffer[headIndex] = null; @@ -179,32 +259,111 @@ public class MailboxImpl implements Mailbox { return letter; } + private void dropAllLetters(List<Runnable> dropInto) { + assert lock.isHeldByCurrentThread(); + int localCount = count; + while (localCount > 0) { + dropInto.add(ringBuffer[headIndex]); + ringBuffer[headIndex] = null; + headIndex = increaseIndexWithWrapAround(headIndex); + --localCount; + notFull.signal(); + } + count = 0; + } + private int increaseIndexWithWrapAround(int old) { return (old + 1) & moduloMask; } + private int decreaseIndexWithWrapAround(int old) { + return (old - 1) & moduloMask; + } + private boolean isFull() { - return count >= ringBuffer.length; + return count >= capacity(); } private boolean isEmpty() { return count == 0; } + private boolean isPutAbleState() { + return state == State.OPEN; + } + + private boolean isTakeAbleState() { + return state != State.CLOSED; + } + + private void checkPutStateConditions() throws MailboxStateException { + final State state = this.state; + if (!isPutAbleState()) { + throw new MailboxStateException("Mailbox is in state " + state + ", but is required to be in state " + + State.OPEN + " for put operations."); + } + } + + private void checkTakeStateConditions() throws MailboxStateException { + final State state = this.state; + if (!isTakeAbleState()) { + throw new MailboxStateException("Mailbox is in state " + state + ", but is required to be in state " + + State.OPEN + " or " + State.QUIESCED + " for take operations."); + } + } + + @Override + public void open() { + lock.lock(); + try { + if (state == State.CLOSED) { + state = State.OPEN; + } + } finally { + lock.unlock(); + } + } + @Override - public void clearAndPut(@Nonnull Runnable shutdownAction) { + public void quiesce() { lock.lock(); try { - int localCount = count; - while (localCount > 0) { - ringBuffer[headIndex] = null; - headIndex = increaseIndexWithWrapAround(headIndex); - --localCount; + if (state == State.OPEN) { + state = State.QUIESCED; } - count = 0; - putInternal(shutdownAction); + notFull.signalAll(); } finally { lock.unlock(); } } + + @Nonnull + @Override + public List<Runnable> close() { + final ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity()); + + lock.lock(); + try { + dropAllLetters(droppedLetters); + state = State.CLOSED; + // to unblock all + notFull.signalAll(); + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + + return droppedLetters; + } + + @Nonnull + @Override + public State getState() { + return state; + } + + @Override + public int capacity() { + return ringBuffer.length; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java index 2d2f112..fc7c754 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java @@ -39,15 +39,17 @@ public interface MailboxReceiver { * * @return an optional with either the oldest letter from the mailbox (head of queue) if the mailbox is not empty or * an empty optional otherwise. + * @throws MailboxStateException if mailbox is already closed. */ - Optional<Runnable> tryTakeMail(); + Optional<Runnable> tryTakeMail() throws MailboxStateException; /** * This method returns the oldest letter from the mailbox (head of queue) or blocks until a letter is available. * * @return the oldest letter from the mailbox (head of queue). * @throws InterruptedException on interruption. + * @throws MailboxStateException if mailbox is already closed. */ @Nonnull - Runnable takeMail() throws InterruptedException; + Runnable takeMail() throws InterruptedException, MailboxStateException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java index 36d10a1..2a2274a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java @@ -32,14 +32,16 @@ public interface MailboxSender { * * @param letter the letter to enqueue. * @return <code>true</code> iff successful. + * @throws MailboxStateException if the mailbox is quiesced or closed. */ - boolean tryPutMail(@Nonnull Runnable letter); + boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException; /** * Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put. * * @param letter the letter to enqueue. * @throws InterruptedException on interruption. + * @throws MailboxStateException if the mailbox is quiesced or closed. */ - void putMail(@Nonnull Runnable letter) throws InterruptedException; + void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxStateException.java similarity index 60% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxStateException.java index dfa8d76..a3168c7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxStateException.java @@ -18,19 +18,24 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; -import javax.annotation.Nonnull; - /** - * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between - * multiple producer threads and a single consumer. + * This exception signals that a method of the mailbox was invoked in a state that does not support the invocation, + * e.g. on the attempt to put a letter into a closed mailbox. */ -public interface Mailbox extends MailboxReceiver, MailboxSender { +public class MailboxStateException extends Exception { + + MailboxStateException() { + } + + MailboxStateException(String message) { + super(message); + } + + MailboxStateException(String message, Throwable cause) { + super(message, cause); + } - /** - * The effect of this is that all pending letters are dropped and the given priorityAction - * is enqueued to the head of the mailbox. - * - * @param priorityAction action to enqueue atomically after the mailbox was cleared. - */ - void clearAndPut(@Nonnull Runnable priorityAction); + MailboxStateException(Throwable cause) { + super(cause); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/DefaultActionContext.java similarity index 50% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/DefaultActionContext.java index dfa8d76..ba933e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/DefaultActionContext.java @@ -16,21 +16,23 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks.mailbox; - -import javax.annotation.Nonnull; +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; /** - * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between - * multiple producer threads and a single consumer. + * This context is a feedback interface for the default action to interact with the mailbox execution. In particular + * it offers ways to signal that the execution of the default action should be finished or temporarily suspended. */ -public interface Mailbox extends MailboxReceiver, MailboxSender { +public interface DefaultActionContext { + + /** + * This method must be called to end the stream task when all actions for the tasks have been performed. This + * method can be invoked from any thread. + */ + void allActionsCompleted(); /** - * The effect of this is that all pending letters are dropped and the given priorityAction - * is enqueued to the head of the mailbox. - * - * @param priorityAction action to enqueue atomically after the mailbox was cleared. + * Calling this method signals that the mailbox-thread should (temporarily) stop invoking the default action, + * e.g. because there is currently no input available. This method must be invoked from the mailbox-thread only! */ - void clearAndPut(@Nonnull Runnable priorityAction); + SuspendedMailboxDefaultAction suspendDefaultAction(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxDefaultAction.java similarity index 56% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxDefaultAction.java index dfa8d76..21fc191 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxDefaultAction.java @@ -16,21 +16,22 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks.mailbox; +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; -import javax.annotation.Nonnull; +import org.apache.flink.annotation.Internal; /** - * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between - * multiple producer threads and a single consumer. + * Interface for the default action that is repeatedly invoked in the mailbox-loop. */ -public interface Mailbox extends MailboxReceiver, MailboxSender { +@Internal +public interface MailboxDefaultAction { /** - * The effect of this is that all pending letters are dropped and the given priorityAction - * is enqueued to the head of the mailbox. + * This method implements the default action of the mailbox loop (e.g. processing one event from the input). + * Implementations should (in general) be non-blocking. * - * @param priorityAction action to enqueue atomically after the mailbox was cleared. + * @param context context object for collaborative interaction between the default action and the mailbox loop. + * @throws Exception on any problems in the action. */ - void clearAndPut(@Nonnull Runnable priorityAction); + void runDefaultAction(DefaultActionContext context) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java new file mode 100644 index 0000000..1848e77 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; + +import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox; + +import javax.annotation.Nonnull; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +/** + * Interface for an {@link Executor} build around a {@link Mailbox}-based execution model. + */ +public interface MailboxExecutor extends Executor { + + /** + * Executes the given command at some time in the future in the mailbox thread. This call can block when the + * mailbox is currently full. Therefore, this method must not be called from the mailbox thread itself as this + * can cause a deadlock. Instead, if the caller is already in the mailbox thread, the command should just be + * executed directly or use the non-blocking {@link #tryExecute(Runnable)}. + * + * @param command the runnable task to add to the mailbox for execution. + * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is + * quiesced or closed. + */ + @Override + void execute(@Nonnull Runnable command) throws RejectedExecutionException; + + /** + * Attempts to enqueue the given command in the mailbox for execution. On success, the method returns true. If + * the mailbox is full, this method returns immediately without adding the command and returns false. + * + * @param command the runnable task to add to the mailbox for execution. + * @return true if the command was added to the mailbox. False if the command could not be added because the mailbox + * was full. + * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is + * quiesced or closed. + */ + boolean tryExecute(Runnable command) throws RejectedExecutionException; + + /** + * This methods starts running the command at the head of the mailbox and is intended to be used by the mailbox + * thread to yield from a currently ongoing action to another command. The method blocks until another command to + * run is available in the mailbox and must only be called from the mailbox thread. Must only be called from the + * mailbox thread to not violate the single-threaded execution model. + * + * @throws InterruptedException on interruption. + * @throws IllegalStateException if the mailbox is closed and can no longer supply runnables for yielding. + */ + void yield() throws InterruptedException, IllegalStateException; + + /** + * This methods attempts to run the command at the head of the mailbox. This is intended to be used by the mailbox + * thread to yield from a currently ongoing action to another command. The method returns true if a command was + * found and executed or false if the mailbox was empty. Must only be called from the + * mailbox thread to not violate the single-threaded execution model. + * + * @return true on successful yielding to another command, false if there was no command to yield to. + * @throws IllegalStateException if the mailbox is closed and can no longer supply runnables for yielding. + */ + boolean tryYield() throws IllegalStateException; + + /** + * Check if the current thread is the mailbox thread. + * + * @return only true if called from the mailbox thread. + */ + boolean isMailboxThread(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorService.java similarity index 56% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorService.java index dfa8d76..1c267c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorService.java @@ -16,21 +16,14 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks.mailbox; +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; -import javax.annotation.Nonnull; +import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox; + +import java.util.concurrent.ExecutorService; /** - * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between - * multiple producer threads and a single consumer. + * Interface for an {@link ExecutorService} build around a {@link Mailbox}-based execution model. */ -public interface Mailbox extends MailboxReceiver, MailboxSender { - - /** - * The effect of this is that all pending letters are dropped and the given priorityAction - * is enqueued to the head of the mailbox. - * - * @param priorityAction action to enqueue atomically after the mailbox was cleared. - */ - void clearAndPut(@Nonnull Runnable priorityAction); +public interface MailboxExecutorService extends MailboxExecutor, ExecutorService { } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java new file mode 100644 index 0000000..944b7a9 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; + +import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxStateException; + +import javax.annotation.Nonnull; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of an executor service build around a mailbox-based execution model. + */ +public class MailboxExecutorServiceImpl extends AbstractExecutorService implements MailboxExecutorService { + + /** Reference to the thread that executes the mailbox letters. */ + @Nonnull + private final Thread taskMailboxThread; + + /** The mailbox that manages the submitted runnable objects. */ + @Nonnull + private final Mailbox mailbox; + + public MailboxExecutorServiceImpl(@Nonnull Mailbox mailbox) { + this(mailbox, Thread.currentThread()); + } + + public MailboxExecutorServiceImpl(@Nonnull Mailbox mailbox, @Nonnull Thread taskMailboxThread) { + this.mailbox = mailbox; + this.taskMailboxThread = taskMailboxThread; + } + + @Override + public void execute(@Nonnull Runnable command) { + checkIsNotMailboxThread(); + try { + mailbox.putMail(command); + } catch (InterruptedException irex) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Sender thread was interrupted while blocking on mailbox.", irex); + } catch (MailboxStateException mbex) { + throw new RejectedExecutionException(mbex); + } + } + + @Override + public boolean tryExecute(Runnable command) { + try { + return mailbox.tryPutMail(command); + } catch (MailboxStateException e) { + throw new RejectedExecutionException(e); + } + } + + @Override + public void yield() throws InterruptedException, IllegalStateException { + checkIsMailboxThread(); + try { + Runnable runnable = mailbox.takeMail(); + runnable.run(); + } catch (MailboxStateException e) { + throw new IllegalStateException("Mailbox can no longer supply runnables for yielding.", e); + } + } + + @Override + public boolean tryYield() throws IllegalStateException { + checkIsMailboxThread(); + try { + Optional<Runnable> runnableOptional = mailbox.tryTakeMail(); + if (runnableOptional.isPresent()) { + runnableOptional.get().run(); + return true; + } else { + return false; + } + } catch (MailboxStateException e) { + throw new IllegalStateException("Mailbox can no longer supply runnables for yielding.", e); + } + } + + @Override + public boolean isMailboxThread() { + return Thread.currentThread() == taskMailboxThread; + } + + @Override + public void shutdown() { + mailbox.quiesce(); + } + + @Nonnull + @Override + public List<Runnable> shutdownNow() { + return mailbox.close(); + } + + @Override + public boolean isShutdown() { + return mailbox.getState() != Mailbox.State.OPEN; + } + + @Override + public boolean isTerminated() { + return mailbox.getState() == Mailbox.State.CLOSED; + } + + @Override + public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) { + throw new UnsupportedOperationException("This method is not supported by this implementation."); + } + + /** + * Returns the mailbox that manages the execution order. + * + * @return the mailbox. + */ + @Nonnull + public Mailbox getMailbox() { + return mailbox; + } + + private void checkIsMailboxThread() { + if (!isMailboxThread()) { + throw new IllegalStateException( + "Illegal thread detected. This method must be called from inside the mailbox thread!"); + } + } + + private void checkIsNotMailboxThread() { + if (isMailboxThread()) { + throw new IllegalStateException( + "Illegal thread detected. This method must NOT be called from inside the mailbox thread!"); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java new file mode 100644 index 0000000..77906f7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxStateException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * <p>The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * <p>The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * <p>This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final MailboxExecutorService mailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** + * Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended + * default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. + * Must only be accessed from mailbox thread. + */ + private SuspendedMailboxDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new MailboxImpl(); + this.mailboxExecutor = new MailboxExecutorServiceImpl(mailbox); + this.mailboxPoisonLetter = () -> mailboxLoopRunning = false; + this.mailboxLoopRunning = true; + this.suspendedDefaultAction = null; + } + + /** + * Returns an executor service facade to submit actions to the mailbox. + */ + public MailboxExecutorService getMailboxExecutor() { + return mailboxExecutor; + } + + /** + * Lifecycle method to open the mailbox for action submission. + */ + public void open() { + mailbox.open(); + } + + /** + * Lifecycle method to close the mailbox for action submission. + */ + public void prepareClose() { + mailboxExecutor.shutdown(); + } + + /** + * Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all instances of + * {@link java.util.concurrent.RunnableFuture} that are still contained in the mailbox. + */ + public void close() { + FutureUtils.cancelRunnableFutures(mailboxExecutor.shutdownNow()); + } + + /** + * Runs the mailbox processing loop. This is where the main work is done. + */ + public void runMailboxLoop() throws Exception { + + Preconditions.checkState( + mailboxExecutor.isMailboxThread(), + "Method must be executed by declared mailbox thread!"); + + final Mailbox localMailbox = mailbox; + + assert localMailbox.getState() == Mailbox.State.OPEN : "Mailbox must be opened!"; + + final MailboxDefaultActionContext defaultActionContext = new MailboxDefaultActionContext(this); + + while (processMail(localMailbox)) { + mailboxDefaultAction.runDefaultAction(defaultActionContext); + } + } + + /** + * Cancels the mailbox loop execution. All pending mailbox actions will not be executed anymore, if they are + * instance of {@link java.util.concurrent.RunnableFuture}, they will be cancelled. + */ + public void cancelMailboxExecution() { + clearMailboxAndRunPriorityAction(mailboxPoisonLetter); + } + + /** + * Reports a throwable for rethrowing from the mailbox thread. This will clear and cancel all other pending letters. + * @param throwable to report by rethrowing from the mailbox loop. + */ + public void reportThrowable(Throwable throwable) { + clearMailboxAndRunPriorityAction(() -> { + throw new WrappingRuntimeException(throwable); + }); + } + + /** + * This method must be called to end the stream task when all actions for the tasks have been performed. + */ + public void allActionsCompleted() { + try { + if (mailboxExecutor.isMailboxThread()) { + mailboxLoopRunning = false; + ensureControlFlowSignalCheck(); + } else { + mailbox.putFirst(mailboxPoisonLetter); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (MailboxStateException me) { + LOG.debug("Action context could not submit poison letter to mailbox.", me); + } + } + + /** + * This helper method handles all special actions from the mailbox. It returns true if the mailbox loop should + * continue running, false if it should stop. In the current design, this method also evaluates all control flag + * changes. This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost + * that all flag changes must make sure that the mailbox signals mailbox#hasMail. + */ + private boolean processMail(Mailbox mailbox) throws MailboxStateException, InterruptedException { + + // Doing this check is an optimization to only have a volatile read in the expected hot path, locks are only + // acquired after this point. + if (!mailbox.hasMail()) { + // We can also directly return true because all changes to #isMailboxLoopRunning must be connected to + // mailbox.hasMail() == true. + return true; + } + + // TODO consider batched draining into list and/or limit number of executed letters + // Take letters in a non-blockingly and execute them. + Optional<Runnable> maybeLetter; + while (isMailboxLoopRunning() && (maybeLetter = mailbox.tryTakeMail()).isPresent()) { + maybeLetter.get().run(); + } + + // If the default action is currently not available, we can run a blocking mailbox execution until the default + // action becomes available again. + while (isDefaultActionUnavailable() && isMailboxLoopRunning()) { + Runnable letter = mailbox.takeMail(); + letter.run(); + } + + return isMailboxLoopRunning(); + } + + /** + * Calling this method signals that the mailbox-thread should (temporarily) stop invoking the default action, + * e.g. because there is currently no input available. + */ + private SuspendedMailboxDefaultAction suspendDefaultAction() { + + Preconditions.checkState(mailboxExecutor.isMailboxThread(), "Suspending must only be called from the mailbox thread!"); + + if (suspendedDefaultAction == null) { + suspendedDefaultAction = new SuspendDefaultActionRunnable(); + ensureControlFlowSignalCheck(); + } + + return suspendedDefaultAction; + } + + private boolean isDefaultActionUnavailable() { + return suspendedDefaultAction != null; + } + + private boolean isMailboxLoopRunning() { + return mailboxLoopRunning; + } + + /** + * Helper method to make sure that the mailbox loop will check the control flow flags in the next iteration. + */ + private void ensureControlFlowSignalCheck() { + // Make sure that mailbox#hasMail is true via a dummy letter so that the flag change is noticed. + if (!mailbox.hasMail()) { + try { + mailbox.tryPutMail(() -> {}); + } catch (MailboxStateException me) { + LOG.debug("Mailbox closed when trying to submit letter for control flow signal.", me); + } + } + } + + private void clearMailboxAndRunPriorityAction(Runnable priorityLetter) { + try { + List<Runnable> droppedRunnables = mailbox.clearAndPut(priorityLetter); + FutureUtils.cancelRunnableFutures(droppedRunnables); + } catch (MailboxStateException msex) { + LOG.debug("Mailbox already closed in cancel().", msex); + } + } + + /** + * Implementation of {@link DefaultActionContext} that is connected to a {@link MailboxProcessor} + * instance. + */ + private static final class MailboxDefaultActionContext implements DefaultActionContext { + + private final MailboxProcessor mailboxProcessor; + + private MailboxDefaultActionContext(MailboxProcessor mailboxProcessor) { + this.mailboxProcessor = mailboxProcessor; + } + + @Override + public void allActionsCompleted() { + mailboxProcessor.allActionsCompleted(); + } + + @Override + public SuspendedMailboxDefaultAction suspendDefaultAction() { + return mailboxProcessor.suspendDefaultAction(); + } + } + + /** + * Represents the suspended state of the default action and offers an idempotent method to resume execution. + */ + private final class SuspendDefaultActionRunnable implements SuspendedMailboxDefaultAction { + + @Override + public void resume() { + Preconditions.checkState( + mailboxExecutor.isMailboxThread(), + "SuspendedMailboxDefaultAction::resume resume must only be called from the mailbox-thread!"); + + if (suspendedDefaultAction == this) { + suspendedDefaultAction = null; + } + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java similarity index 57% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java index dfa8d76..ca31254 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java @@ -16,21 +16,15 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks.mailbox; - -import javax.annotation.Nonnull; +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; /** - * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between - * multiple producer threads and a single consumer. + * Represents the suspended state of a {@link MailboxDefaultAction}, ready to resume. */ -public interface Mailbox extends MailboxReceiver, MailboxSender { +public interface SuspendedMailboxDefaultAction { /** - * The effect of this is that all pending letters are dropped and the given priorityAction - * is enqueued to the head of the mailbox. - * - * @param priorityAction action to enqueue atomically after the mailbox was cleared. + * Resume execution of the default action. Must only be called from the mailbox thread!. */ - void clearAndPut(@Nonnull Runnable priorityAction); + void resume(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java index 1308796..c7c1440 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.ExceptionUtils; @@ -191,7 +192,7 @@ public class StreamTaskSelectiveReadingTest { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { if (!started) { synchronized (this) { this.wait(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 72e8a19..4f5ba03 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -79,6 +79,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -225,7 +226,7 @@ public class StreamTaskTerminationTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { RUN_LATCH.trigger(); // wait until we have started an asynchronous checkpoint CHECKPOINTING_LATCH.await(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 601d72b..949a75f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -110,6 +110,7 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.CloseableIterable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -828,7 +829,7 @@ public class StreamTaskTest extends TestLogger { protected void init() throws Exception {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { context.allActionsCompleted(); } @@ -1035,7 +1036,7 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) { + protected void performDefaultAction(DefaultActionContext context) { if (isCanceled() || inputFinished) { context.allActionsCompleted(); } @@ -1072,7 +1073,7 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { if (fail) { throw new RuntimeException(); } @@ -1160,7 +1161,7 @@ public class StreamTaskTest extends TestLogger { protected void init() {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { holder = new LockHolder(getCheckpointLock(), latch); holder.start(); latch.await(); @@ -1205,7 +1206,7 @@ public class StreamTaskTest extends TestLogger { protected void init() {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); @@ -1271,7 +1272,7 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { syncLatch.await(); super.performDefaultAction(context); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index bac9d43..af09716 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.SerializedValue; import org.junit.Rule; @@ -151,7 +152,7 @@ public class SynchronousCheckpointITCase { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { if (!isRunning) { isRunning = true; eventQueue.put(Event.TASK_IS_RUNNING); @@ -159,7 +160,7 @@ public class SynchronousCheckpointITCase { if (isCanceled()) { context.allActionsCompleted(); } else { - context.actionsUnavailable(); + context.suspendDefaultAction(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java index 8b71423..0b36ad7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.junit.Before; import org.junit.Test; @@ -171,7 +172,7 @@ public class SynchronousCheckpointTest { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { runningLatch.trigger(); execLatch.await(); super.performDefaultAction(context); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index e40e23d..7360508 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -82,6 +82,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamFilter; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -474,7 +475,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { public void init() {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { triggerCheckpointOnBarrier( new CheckpointMetaData( 11L, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java index 9c4edbf..1a48136 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java @@ -18,17 +18,23 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.ThrowingRunnable; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.LinkedList; +import java.util.List; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; /** * Unit tests for {@link MailboxImpl}. @@ -36,7 +42,7 @@ import java.util.Queue; public class MailboxImplTest { private static final Runnable POISON_LETTER = () -> {}; - private static final int CAPACITY_POW_2 = 1; + private static final int CAPACITY_POW_2 = 2; private static final int CAPACITY = 1 << CAPACITY_POW_2; /** @@ -45,24 +51,77 @@ public class MailboxImplTest { private Mailbox mailbox; @Before - public void setUp() throws Exception { + public void setUp() { mailbox = new MailboxImpl(CAPACITY_POW_2); + mailbox.open(); + } + + @After + public void tearDown() { + mailbox.close(); } /** * Test for #clearAndPut should remove other pending events and enqueue directly to the head of the mailbox queue. */ @Test - public void testClearAndPut() { + public void testClearAndPut() throws Exception { + + Runnable letterInstance = () -> {}; + for (int i = 0; i < CAPACITY; ++i) { - Assert.assertTrue(mailbox.tryPutMail(() -> {})); + Assert.assertTrue(mailbox.tryPutMail(letterInstance)); } - mailbox.clearAndPut(POISON_LETTER); + List<Runnable> droppedLetters = mailbox.clearAndPut(POISON_LETTER); Assert.assertTrue(mailbox.hasMail()); Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get()); Assert.assertFalse(mailbox.hasMail()); + Assert.assertEquals(CAPACITY, droppedLetters.size()); + } + + @Test + public void testPutAsHead() throws Exception { + + Runnable instanceA = () -> {}; + Runnable instanceB = () -> {}; + Runnable instanceC = () -> {}; + Runnable instanceD = () -> {}; + Runnable instanceE = () -> {}; + + mailbox.putMail(instanceD); + mailbox.tryPutFirst(instanceC); + mailbox.putMail(instanceE); + mailbox.putFirst(instanceA); + + OneShotLatch latch = new OneShotLatch(); + Thread blockingPut = new Thread(() -> { + // ensure we are full + try { + if (!mailbox.tryPutFirst(() -> { })) { + latch.trigger(); + + mailbox.putFirst(instanceB); + + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (MailboxStateException ignore) { + } + }); + + blockingPut.start(); + latch.await(); + + Assert.assertSame(instanceA, mailbox.takeMail()); + blockingPut.join(); + Assert.assertSame(instanceB, mailbox.takeMail()); + Assert.assertSame(instanceC, mailbox.takeMail()); + Assert.assertSame(instanceD, mailbox.takeMail()); + Assert.assertSame(instanceE, mailbox.takeMail()); + + Assert.assertFalse(mailbox.tryTakeMail().isPresent()); } @Test @@ -111,6 +170,122 @@ public class MailboxImplTest { } /** + * Test that closing the mailbox unblocks pending accesses with correct exceptions. + */ + @Test + public void testCloseUnblocks() throws InterruptedException { + testAllPuttingUnblocksInternal(Mailbox::close); + setUp(); + testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close, MailboxStateException.class); + } + + /** + * Test that silencing the mailbox unblocks pending accesses with correct exceptions. + */ + @Test + public void testQuiesceUnblocks() throws Exception { + testAllPuttingUnblocksInternal(Mailbox::quiesce); + } + + @Test + public void testLifeCycleQuiesce() throws Exception { + mailbox.putMail(() -> {}); + mailbox.putMail(() -> {}); + mailbox.quiesce(); + testLifecyclePuttingInternal(); + mailbox.takeMail(); + Assert.assertTrue(mailbox.tryTakeMail().isPresent()); + Assert.assertFalse(mailbox.tryTakeMail().isPresent()); + } + + @Test + public void testLifeCycleClose() throws Exception { + mailbox.close(); + testLifecyclePuttingInternal(); + + try { + mailbox.takeMail(); + Assert.fail(); + } catch (MailboxStateException ignore) { + } + + try { + mailbox.tryTakeMail(); + Assert.fail(); + } catch (MailboxStateException ignore) { + } + } + + private void testLifecyclePuttingInternal() throws Exception { + try { + mailbox.tryPutMail(() -> {}); + Assert.fail(); + } catch (MailboxStateException ignore) { + } + try { + mailbox.tryPutFirst(() -> {}); + Assert.fail(); + } catch (MailboxStateException ignore) { + } + try { + mailbox.putMail(() -> {}); + Assert.fail(); + } catch (MailboxStateException ignore) { + } + try { + mailbox.putFirst(() -> {}); + Assert.fail(); + } catch (MailboxStateException ignore) { + } + } + + private void testAllPuttingUnblocksInternal(Consumer<Mailbox> unblockMethod) throws InterruptedException { + testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod, MailboxStateException.class); + setUp(); + testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod, MailboxStateException.class); + setUp(); + testUnblocksInternal(() -> mailbox.clearAndPut(() -> {}), unblockMethod, MailboxStateException.class); + } + + private void testUnblocksInternal( + RunnableWithException testMethod, + Consumer<Mailbox> unblockMethod, + Class<?> expectedExceptionClass) throws InterruptedException { + final Thread[] blockedThreads = new Thread[CAPACITY * 2]; + final Exception[] exceptions = new Exception[blockedThreads.length]; + + CountDownLatch countDownLatch = new CountDownLatch(blockedThreads.length); + + for (int i = 0; i < blockedThreads.length; ++i) { + final int id = i; + Thread blocked = new Thread(() -> { + try { + countDownLatch.countDown(); + while (true) { + testMethod.run(); + } + } catch (Exception ex) { + exceptions[id] = ex; + } + }); + blockedThreads[i] = blocked; + blocked.start(); + } + + countDownLatch.await(); + unblockMethod.accept(mailbox); + + for (Thread blockedThread : blockedThreads) { + blockedThread.join(); + } + + for (Exception exception : exceptions) { + Assert.assertEquals(expectedExceptionClass, exception.getClass()); + } + + } + + /** * Test producer-consumer pattern through the mailbox in a concurrent setting (n-writer / 1-reader). */ private void testPutTake( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java new file mode 100644 index 0000000..92e106d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl; +import org.apache.flink.util.Preconditions; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for {@link MailboxExecutorServiceImpl}. + */ +public class MailboxExecutorServiceImplTest { + + private MailboxExecutorServiceImpl mailboxExecutorService; + private ExecutorService otherThreadExecutor; + private MailboxImpl mailbox; + + @Before + public void setUp() throws Exception { + this.mailbox = new MailboxImpl(); + this.mailbox.open(); + this.mailboxExecutorService = new MailboxExecutorServiceImpl(mailbox); + this.otherThreadExecutor = Executors.newSingleThreadScheduledExecutor(); + } + + @After + public void tearDown() { + otherThreadExecutor.shutdown(); + try { + if (!otherThreadExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + otherThreadExecutor.shutdownNow(); + if (!otherThreadExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + throw new IllegalStateException("Thread pool did not terminate on time!"); + } + } + } catch (InterruptedException ie) { + otherThreadExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Test + public void testOpsAndLifecycle() throws Exception { + Assert.assertFalse(mailboxExecutorService.isShutdown()); + Assert.assertFalse(mailboxExecutorService.isTerminated()); + final TestRunnable testRunnable = new TestRunnable(); + Assert.assertTrue(mailboxExecutorService.tryExecute(testRunnable)); + Assert.assertEquals(testRunnable, mailbox.tryTakeMail().get()); + CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get(); + Assert.assertEquals(testRunnable, mailbox.takeMail()); + final TestRunnable yieldRun = new TestRunnable(); + final TestRunnable leftoverRun = new TestRunnable(); + Assert.assertTrue(mailboxExecutorService.tryExecute(yieldRun)); + Future<?> leftoverFuture = CompletableFuture.supplyAsync( + () -> mailboxExecutorService.submit(leftoverRun), otherThreadExecutor).get(); + mailboxExecutorService.shutdown(); + Assert.assertTrue(mailboxExecutorService.isShutdown()); + Assert.assertFalse(mailboxExecutorService.isTerminated()); + + try { + CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get(); + Assert.fail("execution should not work after shutdown()."); + } catch (ExecutionException expected) { + Assert.assertTrue(expected.getCause() instanceof RejectedExecutionException); + } + + try { + CompletableFuture.runAsync(() -> mailboxExecutorService.tryExecute(testRunnable), otherThreadExecutor).get(); + Assert.fail("execution should not work after shutdown()."); + } catch (ExecutionException expected) { + Assert.assertTrue(expected.getCause() instanceof RejectedExecutionException); + } + + Assert.assertTrue(mailboxExecutorService.tryYield()); + Assert.assertEquals(Thread.currentThread(), yieldRun.wasExecutedBy()); + Assert.assertFalse(leftoverFuture.isDone()); + + List<Runnable> leftoverTasks = mailboxExecutorService.shutdownNow(); + Assert.assertEquals(1, leftoverTasks.size()); + Assert.assertFalse(leftoverFuture.isCancelled()); + FutureUtils.cancelRunnableFutures(leftoverTasks); + Assert.assertTrue(leftoverFuture.isCancelled()); + + try { + mailboxExecutorService.tryYield(); + Assert.fail("yielding should not work after shutdown()."); + } catch (IllegalStateException expected) { + } + + try { + mailboxExecutorService.yield(); + Assert.fail("yielding should not work after shutdown()."); + } catch (IllegalStateException expected) { + } + } + + @Test + public void testTryYield() throws Exception { + final TestRunnable testRunnable = new TestRunnable(); + CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get(); + Assert.assertTrue(mailboxExecutorService.tryYield()); + Assert.assertFalse(mailbox.tryTakeMail().isPresent()); + Assert.assertEquals(Thread.currentThread(), testRunnable.wasExecutedBy()); + } + + @Test + public void testYield() throws Exception { + final AtomicReference<Exception> exceptionReference = new AtomicReference<>(); + final TestRunnable testRunnable = new TestRunnable(); + final Thread submitThread = new Thread(() -> { + try { + mailboxExecutorService.execute(testRunnable); + } catch (Exception e) { + exceptionReference.set(e); + } + }); + + submitThread.start(); + mailboxExecutorService.yield(); + submitThread.join(); + + Assert.assertNull(exceptionReference.get()); + Assert.assertEquals(Thread.currentThread(), testRunnable.wasExecutedBy()); + } + + /** + * Test {@link Runnable} that tracks execution. + */ + static class TestRunnable implements Runnable { + + private Thread executedByThread = null; + + @Override + public void run() { + Preconditions.checkState(!isExecuted(), "Runnable was already executed before by " + executedByThread); + executedByThread = Thread.currentThread(); + } + + boolean isExecuted() { + return executedByThread != null; + } + + Thread wasExecutedBy() { + return executedByThread; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java new file mode 100644 index 0000000..80c71dd --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; + +import org.apache.flink.core.testutils.OneShotLatch; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Unit tests for {@link MailboxProcessor}. + */ +public class MailboxProcessorTest { + + @Test + public void testRejectIfNotOpen() { + MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {}); + try { + mailboxProcessor.getMailboxExecutor().tryExecute(() -> {}); + Assert.fail("Should not be able to accept runnables if not opened."); + } catch (RejectedExecutionException expected) { + } + } + + @Test + public void testShutdown() { + MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {}); + FutureTask<Void> testRunnableFuture = new FutureTask<>(() -> {}, null); + mailboxProcessor.open(); + mailboxProcessor.getMailboxExecutor().tryExecute(testRunnableFuture); + mailboxProcessor.prepareClose(); + + try { + mailboxProcessor.getMailboxExecutor().tryExecute(() -> {}); + Assert.fail("Should not be able to accept runnables if not opened."); + } catch (RejectedExecutionException expected) { + } + + Assert.assertFalse(testRunnableFuture.isDone()); + + mailboxProcessor.close(); + Assert.assertTrue(testRunnableFuture.isCancelled()); + } + + @Test + public void testRunDefaultActionAndLetters() throws Exception { + AtomicBoolean stop = new AtomicBoolean(false); + MailboxThread mailboxThread = new MailboxThread() { + @Override + public void runDefaultAction(DefaultActionContext context) throws Exception { + if (stop.get()) { + context.allActionsCompleted(); + } else { + Thread.sleep(10L); + } + } + }; + + MailboxProcessor mailboxProcessor = start(mailboxThread); + mailboxProcessor.getMailboxExecutor().execute(() -> stop.set(true)); + stop(mailboxThread); + } + + @Test + public void testRunDefaultAction() throws Exception { + + final int expectedInvocations = 3; + final AtomicInteger counter = new AtomicInteger(0); + MailboxThread mailboxThread = new MailboxThread() { + @Override + public void runDefaultAction(DefaultActionContext context) { + if (counter.incrementAndGet() == expectedInvocations) { + context.allActionsCompleted(); + } + } + }; + + start(mailboxThread); + stop(mailboxThread); + Assert.assertEquals(expectedInvocations, counter.get()); + } + + @Test + public void testSignalUnAvailable() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final AtomicReference<SuspendedMailboxDefaultAction> suspendedActionRef = new AtomicReference<>(); + final OneShotLatch actionSuspendedLatch = new OneShotLatch(); + final int blockAfterInvocations = 3; + final int totalInvocations = blockAfterInvocations * 2; + + MailboxThread mailboxThread = new MailboxThread() { + @Override + public void runDefaultAction(DefaultActionContext context) { + if (counter.incrementAndGet() == blockAfterInvocations) { + suspendedActionRef.set(context.suspendDefaultAction()); + actionSuspendedLatch.trigger(); + } else if (counter.get() == totalInvocations) { + context.allActionsCompleted(); + } + } + }; + + start(mailboxThread); + actionSuspendedLatch.await(); + Assert.assertEquals(blockAfterInvocations, counter.get()); + + suspendedActionRef.get().resume(); + stop(mailboxThread); + Assert.assertEquals(totalInvocations, counter.get()); + } + + @Test + public void testSignalUnAvailablePingPong() throws Exception { + final AtomicReference<SuspendedMailboxDefaultAction> suspendedActionRef = new AtomicReference<>(); + final int totalSwitches = 10000; + final MailboxThread mailboxThread = new MailboxThread() { + int count = 0; + + @Override + public void runDefaultAction(DefaultActionContext context) { + + // If this is violated, it means that the default action was invoked while we assumed suspension + Assert.assertTrue(suspendedActionRef.compareAndSet(null, context.suspendDefaultAction())); + + ++count; + + if (count == totalSwitches) { + context.allActionsCompleted(); + } else if (count % 1000 == 0) { + try { + Thread.sleep(1L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }; + + mailboxThread.start(); + final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor(); + mailboxProcessor.open(); + + final Thread asyncUnblocker = new Thread(() -> { + int count = 0; + while (!Thread.currentThread().isInterrupted()) { + + final SuspendedMailboxDefaultAction resume = + suspendedActionRef.getAndSet(null); + if (resume != null) { + resume.resume(); + } else { + try { + mailboxProcessor.getMailboxExecutor().execute(() -> { }); + } catch (RejectedExecutionException ignore) { + } + } + + ++count; + if (count % 5000 == 0) { + try { + Thread.sleep(1L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }); + + asyncUnblocker.start(); + mailboxThread.signalStart(); + mailboxThread.join(); + asyncUnblocker.interrupt(); + asyncUnblocker.join(); + mailboxProcessor.prepareClose(); + mailboxProcessor.close(); + mailboxThread.checkException(); + } + + private static MailboxProcessor start(MailboxThread mailboxThread) { + mailboxThread.start(); + final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor(); + mailboxProcessor.open(); + mailboxThread.signalStart(); + return mailboxProcessor; + } + + private static void stop(MailboxThread mailboxThread) throws Exception { + mailboxThread.join(); + MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor(); + mailboxProcessor.prepareClose(); + mailboxProcessor.close(); + mailboxThread.checkException(); + } + + static class MailboxThread extends Thread implements MailboxDefaultAction { + + MailboxProcessor mailboxProcessor; + OneShotLatch mailboxCreatedLatch = new OneShotLatch(); + OneShotLatch canRun = new OneShotLatch(); + private Throwable caughtException; + + @Override + public final void run() { + mailboxProcessor = new MailboxProcessor(this); + mailboxCreatedLatch.trigger(); + try { + canRun.await(); + mailboxProcessor.runMailboxLoop(); + } catch (Throwable t) { + this.caughtException = t; + } + } + + @Override + public void runDefaultAction(DefaultActionContext context) throws Exception { + context.allActionsCompleted(); + } + + final MailboxProcessor getMailboxProcessor() { + try { + mailboxCreatedLatch.await(); + return mailboxProcessor; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + final void signalStart() { + if (mailboxCreatedLatch.isTriggered()) { + canRun.trigger(); + } + } + + void checkException() throws Exception { + if (caughtException != null) { + throw new Exception(caughtException); + } + } + } + +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java new file mode 100644 index 0000000..31126a8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox.execution; + +import javax.annotation.Nonnull; + +import java.util.concurrent.RejectedExecutionException; + +/** + * Dummy implementation of {@link MailboxExecutor} for testing. + */ +public class TestMailboxExecutor implements MailboxExecutor { + + private final Object lock; + + public TestMailboxExecutor(Object lock) { + this.lock = lock; + } + + public TestMailboxExecutor() { + this(new Object()); + } + + @Override + public void execute(@Nonnull Runnable command) throws RejectedExecutionException { + synchronized (lock) { + command.run(); + lock.notifyAll(); + } + } + + @Override + public boolean tryExecute(Runnable command) { + execute(command); + return true; + } + + @Override + public void yield() throws InterruptedException { + synchronized (lock) { + lock.wait(1); + } + } + + @Override + public boolean tryYield() { + return false; + } + + @Override + public boolean isMailboxThread() { + return true; + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java index 37e7328..00a1641 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import java.util.Map; import java.util.function.BiConsumer; @@ -81,7 +82,7 @@ public class MockStreamTask extends StreamTask { public void init() { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { context.allActionsCompleted(); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index d54ec1f..7dd07db 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.test.util.AbstractTestBase; import org.junit.Assume; @@ -286,7 +287,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); if (taskIndex == 0) { numberOfRestarts.countDown(); @@ -343,7 +344,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void performDefaultAction(DefaultActionContext context) throws Exception { invokeLatch.countDown(); finishLatch.await(); context.allActionsCompleted();