This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 022f6cceef65859bc6f172151d09140038297f69
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Fri May 10 11:20:02 2019 +0200

    [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask 
main-loop.
    
    This closes #8409.
    This closes #8431.
    
    This also decomposes monolithic run-loops in StreamTask implementations 
into step-wise calls.
---
 .../runtime/tasks/OneInputStreamTask.java          |  12 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |   5 +-
 .../runtime/tasks/StreamIterationHead.java         | 105 ++++-----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  82 ++++++-
 .../runtime/tasks/TwoInputStreamTask.java          |  13 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  36 ++++
 .../runtime/tasks/mailbox/MailboxImpl.java         | 236 +++++++++++++++++++++
 .../runtime/tasks/mailbox/MailboxReceiver.java     |  59 ++++++
 .../runtime/tasks/mailbox/MailboxSender.java       |  52 +++++
 ...heckpointExceptionHandlerConfigurationTest.java |   4 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   4 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  21 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   3 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   4 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java     | 170 +++++++++++++++
 .../flink/streaming/util/MockStreamTask.java       |   4 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |  10 +-
 19 files changed, 726 insertions(+), 100 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 7498518..7b82d8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -39,8 +39,6 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
 
        private StreamInputProcessor<IN> inputProcessor;
 
-       private volatile boolean running = true;
-
        private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
 
        /**
@@ -98,12 +96,9 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
        }
 
        @Override
-       protected void run() throws Exception {
-               // cache processor reference on the stack, to make the code 
more JIT friendly
-               final StreamInputProcessor<IN> inputProcessor = 
this.inputProcessor;
-
-               while (running && inputProcessor.processInput()) {
-                       // all the work happens in the "processInput" method
+       protected void performDefaultAction(ActionContext context) throws 
Exception {
+               if (!inputProcessor.processInput()) {
+                       context.allActionsCompleted();
                }
        }
 
@@ -116,6 +111,5 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
 
        @Override
        protected void cancelTask() {
-               running = false;
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1a1c529..fd50a1a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -98,8 +98,11 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
        }
 
        @Override
-       protected void run() throws Exception {
+       protected void performDefaultAction(ActionContext context) throws 
Exception {
+               // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
+               // compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
                headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+               context.allActionsCompleted();
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index ecef7f0..d25bd23 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,88 +42,72 @@ public class StreamIterationHead<OUT> extends 
OneInputStreamTask<OUT, OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
-       private volatile boolean running = true;
+       private RecordWriterOutput<OUT>[] streamOutputs;
+
+       private final BlockingQueue<StreamRecord<OUT>> dataChannel;
+       private final String brokerID;
+       private final long iterationWaitTime;
+       private final boolean shouldWait;
 
        public StreamIterationHead(Environment env) {
                super(env);
+               final String iterationId = getConfiguration().getIterationId();
+               if (iterationId == null || iterationId.length() == 0) {
+                       throw new FlinkRuntimeException("Missing iteration ID 
in the task configuration");
+               }
+
+               this.dataChannel = new ArrayBlockingQueue<>(1);
+               this.brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
+                       getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+               this.iterationWaitTime = 
getConfiguration().getIterationWaitTime();
+               this.shouldWait = iterationWaitTime > 0;
        }
 
        // 
------------------------------------------------------------------------
 
        @Override
-       protected void run() throws Exception {
-
-               final String iterationId = getConfiguration().getIterationId();
-               if (iterationId == null || iterationId.length() == 0) {
-                       throw new Exception("Missing iteration ID in the task 
configuration");
+       protected void performDefaultAction(ActionContext context) throws 
Exception {
+               StreamRecord<OUT> nextRecord = shouldWait ?
+                       dataChannel.poll(iterationWaitTime, 
TimeUnit.MILLISECONDS) :
+                       dataChannel.take();
+
+               if (nextRecord != null) {
+                       synchronized (getCheckpointLock()) {
+                               for (RecordWriterOutput<OUT> output : 
streamOutputs) {
+                                       output.collect(nextRecord);
+                               }
+                       }
+               } else {
+                       context.allActionsCompleted();
                }
+       }
 
-               final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId ,
-                               
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-
-               final long iterationWaitTime = 
getConfiguration().getIterationWaitTime();
-               final boolean shouldWait = iterationWaitTime > 0;
-
-               final BlockingQueue<StreamRecord<OUT>> dataChannel = new 
ArrayBlockingQueue<StreamRecord<OUT>>(1);
+       // 
------------------------------------------------------------------------
 
+       @SuppressWarnings("unchecked")
+       @Override
+       public void init() {
                // offer the queue for the tail
                BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
                LOG.info("Iteration head {} added feedback queue under {}", 
getName(), brokerID);
 
-               // do the work
-               try {
-                       @SuppressWarnings("unchecked")
-                       RecordWriterOutput<OUT>[] outputs = 
(RecordWriterOutput<OUT>[]) getStreamOutputs();
-
-                       // If timestamps are enabled we make sure to remove 
cyclic watermark dependencies
-                       if (isSerializingTimestamps()) {
-                               synchronized (getCheckpointLock()) {
-                                       for (RecordWriterOutput<OUT> output : 
outputs) {
-                                               output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                                       }
-                               }
-                       }
+               this.streamOutputs = (RecordWriterOutput<OUT>[]) 
getStreamOutputs();
 
-                       while (running) {
-                               StreamRecord<OUT> nextRecord = shouldWait ?
-                                       dataChannel.poll(iterationWaitTime, 
TimeUnit.MILLISECONDS) :
-                                       dataChannel.take();
-
-                               if (nextRecord != null) {
-                                       synchronized (getCheckpointLock()) {
-                                               for (RecordWriterOutput<OUT> 
output : outputs) {
-                                                       
output.collect(nextRecord);
-                                               }
-                                       }
-                               }
-                               else {
-                                       // done
-                                       break;
+               // If timestamps are enabled we make sure to remove cyclic 
watermark dependencies
+               if (isSerializingTimestamps()) {
+                       synchronized (getCheckpointLock()) {
+                               for (RecordWriterOutput<OUT> output : 
streamOutputs) {
+                                       output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
                                }
                        }
                }
-               finally {
-                       // make sure that we remove the queue from the broker, 
to prevent a resource leak
-                       BlockingQueueBroker.INSTANCE.remove(brokerID);
-                       LOG.info("Iteration head {} removed feedback queue 
under {}", getName(), brokerID);
-               }
-       }
-
-       @Override
-       protected void cancelTask() {
-               running = false;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void init() {
-               // does not hold any resources, no initialization necessary
        }
 
        @Override
-       protected void cleanup() throws Exception {
-               // does not hold any resources, no cleanup necessary
+       protected void cleanup() {
+               // make sure that we remove the queue from the broker, to 
prevent a resource leak
+               BlockingQueueBroker.INSTANCE.remove(brokerID);
+               LOG.info("Iteration head {} removed feedback queue under {}", 
getName(), brokerID);
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8a3d006..2df565d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -55,8 +55,11 @@ import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +71,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -124,6 +128,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        /** The logger used by the StreamTask and its subclasses. */
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamTask.class);
 
+       /** Special value, letter that terminates the mailbox loop. */
+       private static final Runnable POISON_LETTER = () -> {};
+
+       /** Special value, letter that "wakes up" a waiting mailbox loop. */
+       private static final Runnable DEFAULT_ACTION_AVAILABLE = () -> {};
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -182,6 +192,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        private final SynchronousSavepointLatch syncSavepointLatch;
 
+       protected final Mailbox mailbox;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -214,6 +226,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
                this.recordWriters = createRecordWriters(configuration, 
environment);
                this.syncSavepointLatch = new SynchronousSavepointLatch();
+               this.mailbox = new MailboxImpl();
        }
 
        // 
------------------------------------------------------------------------
@@ -222,13 +235,41 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        protected abstract void init() throws Exception;
 
-       protected abstract void run() throws Exception;
-
        protected abstract void cleanup() throws Exception;
 
        protected abstract void cancelTask() throws Exception;
 
        /**
+        * This method implements the default action of the task (e.g. 
processing one event from the input). Implementations
+        * should (in general) be non-blocking.
+        *
+        * @param context context object for collaborative interaction between 
the action and the stream task.
+        * @throws Exception on any problems in the action.
+        */
+       protected abstract void performDefaultAction(ActionContext context) 
throws Exception;
+
+       /**
+        * Runs the stream-tasks main processing loop.
+        */
+       private void run() throws Exception {
+               final ActionContext actionContext = new ActionContext();
+               while (true) {
+                       if (mailbox.hasMail()) {
+                               Optional<Runnable> maybeLetter;
+                               while ((maybeLetter = 
mailbox.tryTakeMail()).isPresent()) {
+                                       Runnable letter = maybeLetter.get();
+                                       if (letter == POISON_LETTER) {
+                                               return;
+                                       }
+                                       letter.run();
+                               }
+                       }
+
+                       performDefaultAction(actionContext);
+               }
+       }
+
+       /**
         * Emits the {@link 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK MAX_WATERMARK}
         * so that all registered timers are fired.
         *
@@ -426,6 +467,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        @Override
        public final void cancel() throws Exception {
+               mailbox.clearAndPut(POISON_LETTER);
                isRunning = false;
                canceled = true;
 
@@ -1280,4 +1322,40 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
                return output;
        }
+
+       /**
+        * The action context is passed as parameter into the default action 
method and holds control methods for feedback
+        * of from the default action to the mailbox.
+        */
+       public final class ActionContext {
+
+               private final Runnable actionUnavailableLetter = 
ThrowingRunnable.unchecked(mailbox::waitUntilHasMail);
+
+               /**
+                * This method must be called to end the stream task when all 
actions for the tasks have been performed.
+                */
+               public void allActionsCompleted() {
+                       mailbox.clearAndPut(POISON_LETTER);
+               }
+
+               /**
+                * Calling this method signals that the mailbox-thread should 
continue invoking the default action, e.g. because
+                * new input became available for processing.
+                *
+                * @throws InterruptedException on interruption.
+                */
+               public void actionsAvailable() throws InterruptedException {
+                       mailbox.putMail(DEFAULT_ACTION_AVAILABLE);
+               }
+
+               /**
+                * Calling this method signals that the mailbox-thread should 
(temporarily) stop invoking the default action,
+                * e.g. because there is currently no input available.
+                *
+                * @throws InterruptedException on interruption.
+                */
+               public void actionsUnavailable() throws InterruptedException {
+                       mailbox.putMail(actionUnavailableLetter);
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 546ccdb..934f2cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -40,8 +40,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
 
        private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
 
-       private volatile boolean running = true;
-
        private final WatermarkGauge input1WatermarkGauge;
        private final WatermarkGauge input2WatermarkGauge;
        private final MinWatermarkGauge minInputWatermarkGauge;
@@ -110,12 +108,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
        }
 
        @Override
-       protected void run() throws Exception {
-               // cache processor reference on the stack, to make the code 
more JIT friendly
-               final StreamTwoInputProcessor<IN1, IN2> inputProcessor = 
this.inputProcessor;
-
-               while (running && inputProcessor.processInput()) {
-                       // all the work happens in the "processInput" method
+       protected void performDefaultAction(ActionContext context) throws 
Exception {
+               if (!inputProcessor.processInput()) {
+                       context.allActionsCompleted();
                }
        }
 
@@ -128,6 +123,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
 
        @Override
        protected void cancelTask() {
-               running = false;
+
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
new file mode 100644
index 0000000..dfa8d76
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
+ * multiple producer threads and a single consumer.
+ */
+public interface Mailbox extends MailboxReceiver, MailboxSender {
+
+       /**
+        * The effect of this is that all pending letters are dropped and the 
given priorityAction
+        * is enqueued to the head of the mailbox.
+        *
+        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
+        */
+       void clearAndPut(@Nonnull Runnable priorityAction);
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
new file mode 100644
index 0000000..411efd1
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Optional;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implementation of {@link Mailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * our use case with multiple writers, single reader and volatile reads 
instead of lock & read on {@link #count}.
+ */
+@ThreadSafe
+public class MailboxImpl implements Mailbox {
+
+       /**
+        * The enqueued letters.
+        */
+       @GuardedBy("lock")
+       private final Runnable[] ringBuffer;
+
+       /**
+        * Lock for all concurrent ops.
+        */
+       private final ReentrantLock lock;
+
+       /**
+        * Condition that is triggered when the buffer is no longer empty.
+        */
+       @GuardedBy("lock")
+       private final Condition notEmpty;
+
+       /**
+        * Condition that is triggered when the buffer is no longer full.
+        */
+       @GuardedBy("lock")
+       private final Condition notFull;
+
+       /**
+        * Index of the ring buffer head.
+        */
+       @GuardedBy("lock")
+       private int headIndex;
+
+       /**
+        * Index of the ring buffer tail.
+        */
+       @GuardedBy("lock")
+       private int tailIndex;
+
+       /**
+        * Number of letters in the mailbox.
+        */
+       @GuardedBy("lock")
+       private volatile int count;
+
+       /**
+        * A mask to wrap around the indexes of the ring buffer. We use this to 
avoid ifs or modulo ops.
+        */
+       private final int moduloMask;
+
+       public MailboxImpl() {
+               this(6); // 2^6 = 64
+       }
+
+       public MailboxImpl(int capacityPow2) {
+               final int capacity = 1 << capacityPow2;
+               Preconditions.checkState(capacity > 0);
+               this.moduloMask = capacity - 1;
+               this.ringBuffer = new Runnable[capacity];
+               this.lock = new ReentrantLock();
+               this.notEmpty = lock.newCondition();
+               this.notFull = lock.newCondition();
+       }
+
+       @Override
+       public boolean hasMail() {
+               return !isEmpty();
+       }
+
+       @Override
+       public Optional<Runnable> tryTakeMail() {
+               final ReentrantLock lock = this.lock;
+               lock.lock();
+               try {
+                       return isEmpty() ? Optional.empty() : 
Optional.of(takeInternal());
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Nonnull
+       @Override
+       public Runnable takeMail() throws InterruptedException {
+               final ReentrantLock lock = this.lock;
+               lock.lockInterruptibly();
+               try {
+                       while (isEmpty()) {
+                               notEmpty.await();
+                       }
+                       return takeInternal();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       public void waitUntilHasMail() throws InterruptedException {
+               final ReentrantLock lock = this.lock;
+               lock.lockInterruptibly();
+               try {
+                       while (isEmpty()) {
+                               notEmpty.await();
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       
//------------------------------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean tryPutMail(@Nonnull Runnable letter) {
+               final ReentrantLock lock = this.lock;
+               lock.lock();
+               try {
+                       if (isFull()) {
+                               return false;
+                       } else {
+                               putInternal(letter);
+                               return true;
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       public void putMail(@Nonnull Runnable letter) throws 
InterruptedException {
+               final ReentrantLock lock = this.lock;
+               lock.lockInterruptibly();
+               try {
+                       while (isFull()) {
+                               notFull.await();
+                       }
+                       putInternal(letter);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       public void waitUntilHasCapacity() throws InterruptedException {
+               final ReentrantLock lock = this.lock;
+               lock.lockInterruptibly();
+               try {
+                       while (isFull()) {
+                               notFull.await();
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       
//------------------------------------------------------------------------------------------------------------------
+
+       private void putInternal(Runnable letter) {
+               assert lock.isHeldByCurrentThread();
+               this.ringBuffer[tailIndex] = letter;
+               tailIndex = increaseIndexWithWrapAround(tailIndex);
+               ++count;
+               notEmpty.signal();
+       }
+
+       private Runnable takeInternal() {
+               assert lock.isHeldByCurrentThread();
+               final Runnable[] buffer = this.ringBuffer;
+               Runnable letter = buffer[headIndex];
+               buffer[headIndex] = null;
+               headIndex = increaseIndexWithWrapAround(headIndex);
+               --count;
+               notFull.signal();
+               return letter;
+       }
+
+       private int increaseIndexWithWrapAround(int old) {
+               return (old + 1) & moduloMask;
+       }
+
+       private boolean isFull() {
+               return count >= ringBuffer.length;
+       }
+
+       private boolean isEmpty() {
+               return count == 0;
+       }
+
+       @Override
+       public void clearAndPut(@Nonnull Runnable shutdownAction) {
+               lock.lock();
+               try {
+                       int localCount = count;
+                       while (localCount > 0) {
+                               ringBuffer[headIndex] = null;
+                               headIndex = 
increaseIndexWithWrapAround(headIndex);
+                               --localCount;
+                       }
+                       count = 0;
+                       putInternal(shutdownAction);
+               } finally {
+                       lock.unlock();
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
new file mode 100644
index 0000000..189687e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+import java.util.Optional;
+
+/**
+ * Consumer-facing side of the {@link Mailbox} interface. This is used to 
dequeue letters. The mailbox returns letters
+ * in the order by which they were enqueued. A mailbox should only be consumed 
by one thread at a time.
+ */
+public interface MailboxReceiver {
+
+       /**
+        * Returns <code>true</code> if the mailbox contains mail.
+        */
+       boolean hasMail();
+
+       /**
+        * Returns an optional with either the oldest letter from the mailbox 
(head of queue) if the mailbox is not empty or
+        * an empty optional otherwise.
+        *
+        * @return an optional with either the oldest letter from the mailbox 
(head of queue) if the mailbox is not empty or
+        * an empty optional otherwise.
+        */
+       Optional<Runnable> tryTakeMail();
+
+       /**
+        * This method returns the oldest letter from the mailbox (head of 
queue) or blocks until a letter is available.
+        *
+        * @return the oldest letter from the mailbox (head of queue).
+        * @throws InterruptedException on interruption.
+        */
+       @Nonnull
+       Runnable takeMail() throws InterruptedException;
+
+       /**
+        * This method blocks if the mailbox is empty until mail becomes 
available.
+        * @throws InterruptedException on interruption.
+        */
+       void waitUntilHasMail() throws InterruptedException;
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
new file mode 100644
index 0000000..1829125
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Producer-facing side of the {@link Mailbox} interface. This is used to 
enqueue letters. Multiple producers threads
+ * can put to the same mailbox.
+ */
+public interface MailboxSender {
+
+       /**
+        * Enqueues the given letter to the mailbox, if capacity is available. 
On success, this returns <code>true</code>
+        * and <code>false</code> if the mailbox was already full.
+        *
+        * @param letter the letter to enqueue.
+        * @return <code>true</code> iff successful.
+        */
+       boolean tryPutMail(@Nonnull Runnable letter);
+
+       /**
+        * Enqueues the given letter to the mailbox and blocks until there is 
capacity for a successful put.
+        *
+        * @param letter the letter to enqueue.
+        * @throws InterruptedException on interruption.
+        */
+       void putMail(@Nonnull Runnable letter) throws InterruptedException;
+
+       /**
+        * This method blocks until the mailbox has again capacity to enqueue 
new letters.
+        *
+        * @throws InterruptedException on interruption.
+        */
+       void waitUntilHasCapacity() throws InterruptedException;
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
index 08cee55..17ab88f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -80,7 +80,9 @@ public class CheckpointExceptionHandlerConfigurationTest 
extends TestLogger {
                        protected void init() throws Exception {}
 
                        @Override
-                       protected void run() throws Exception {}
+                       protected void performDefaultAction(ActionContext 
context) throws Exception {
+                               context.allActionsCompleted();
+                       }
 
                        @Override
                        protected void cleanup() throws Exception {}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 80a38e4..d1b3697 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -188,7 +188,9 @@ public class StreamTaskCancellationBarrierTest {
                }
 
                @Override
-               protected void run() throws Exception {}
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
+                       context.allActionsCompleted();
+               }
 
                @Override
                protected void cleanup() throws Exception {}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 8918b0a..c079d15 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -225,10 +225,11 @@ public class StreamTaskTerminationTest extends TestLogger 
{
                }
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        RUN_LATCH.trigger();
                        // wait until we have started an asynchronous checkpoint
                        CHECKPOINTING_LATCH.await();
+                       context.allActionsCompleted();
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e24949d..af779b6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -844,7 +844,9 @@ public class StreamTaskTest extends TestLogger {
                protected void init() throws Exception {}
 
                @Override
-               protected void run() throws Exception {}
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
+                       context.allActionsCompleted();
+               }
 
                @Override
                protected void cleanup() throws Exception {}
@@ -1031,7 +1033,9 @@ public class StreamTaskTest extends TestLogger {
                protected void init() throws Exception {}
 
                @Override
-               protected void run() throws Exception {}
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
+                       context.allActionsCompleted();
+               }
 
                @Override
                protected void cleanup() throws Exception {}
@@ -1059,10 +1063,11 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        if (fail) {
                                throw new RuntimeException();
                        }
+                       context.allActionsCompleted();
                }
 
                @Override
@@ -1149,7 +1154,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() {}
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        holder = new LockHolder(getCheckpointLock(), latch);
                        holder.start();
                        latch.await();
@@ -1164,6 +1169,7 @@ public class StreamTaskTest extends TestLogger {
                                // restore interruption state
                                Thread.currentThread().interrupt();
                        }
+                       context.allActionsCompleted();
                }
 
                @Override
@@ -1193,7 +1199,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() {}
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        final OneShotLatch latch = new OneShotLatch();
                        final Object lock = new Object();
 
@@ -1219,7 +1225,7 @@ public class StreamTaskTest extends TestLogger {
                        finally {
                                holder.close();
                        }
-
+                       context.allActionsCompleted();
                }
 
                @Override
@@ -1259,8 +1265,9 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        syncLatch.await();
+                       context.allActionsCompleted();
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 222133a..778a7d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -156,9 +156,10 @@ public class SynchronousCheckpointITCase {
                }
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        executionLatch.trigger();
                        cancellationLatch.await();
+                       context.allActionsCompleted();
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 2ad8c6f..f7e36d4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -174,9 +174,10 @@ public class SynchronousCheckpointTest {
                protected void init() {}
 
                @Override
-               protected void run() throws Exception {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        runningLatch.trigger();
                        execLatch.await();
+                       context.allActionsCompleted();
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 244c8aa..9418e14 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -506,8 +506,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                public void init() {}
 
                @Override
-               protected void run() throws Exception {
-
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        triggerCheckpointOnBarrier(
                                new CheckpointMetaData(
                                        11L,
@@ -518,6 +517,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                        while (isRunning()) {
                                Thread.sleep(1L);
                        }
+                       context.allActionsCompleted();
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
new file mode 100644
index 0000000..fc7f19c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Unit tests for {@link MailboxImpl}.
+ */
+public class MailboxImplTest {
+
+       private static final Runnable POISON_LETTER = () -> {};
+       private static final int CAPACITY_POW_2 = 1;
+       private static final int CAPACITY = 1 << CAPACITY_POW_2;
+
+       /**
+        * Object under test.
+        */
+       private Mailbox mailbox;
+
+       @Before
+       public void setUp() throws Exception {
+               mailbox = new MailboxImpl(CAPACITY_POW_2);
+       }
+
+       /**
+        * Test for #clearAndPut should remove other pending events and enqueue 
directly to the head of the mailbox queue.
+        */
+       @Test
+       public void testClearAndPut() {
+               for (int i = 0; i < CAPACITY; ++i) {
+                       Assert.assertTrue(mailbox.tryPutMail(() -> {}));
+               }
+
+               mailbox.clearAndPut(POISON_LETTER);
+
+               Assert.assertTrue(mailbox.hasMail());
+               Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get());
+               Assert.assertFalse(mailbox.hasMail());
+       }
+
+       @Test
+       public void testContracts() throws Exception {
+               final Queue<Runnable> testObjects = new LinkedList<>();
+               Assert.assertFalse(mailbox.hasMail());
+
+               for (int i = 0; i < CAPACITY; ++i) {
+                       Runnable letter = () -> {};
+                       testObjects.add(letter);
+                       Assert.assertTrue(mailbox.tryPutMail(letter));
+                       Assert.assertTrue(mailbox.hasMail());
+               }
+
+               Assert.assertFalse(mailbox.tryPutMail(() -> {}));
+
+               while (!testObjects.isEmpty()) {
+                       Assert.assertEquals(testObjects.remove(), 
mailbox.tryTakeMail().get());
+                       Assert.assertEquals(!testObjects.isEmpty(), 
mailbox.hasMail());
+                       mailbox.waitUntilHasCapacity(); // should not block 
here because the mailbox is not full
+               }
+
+               Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() 
-> mailbox.waitUntilHasMail()));
+               waitingReader.start();
+               Thread.sleep(1);
+               Assert.assertTrue(waitingReader.isAlive());
+               mailbox.tryPutMail(() -> {});
+               waitingReader.join(); // should complete here
+
+               while (mailbox.tryPutMail(() -> {})) {}
+
+               Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() 
-> mailbox.waitUntilHasCapacity()));
+               waitingWriter.start();
+               Thread.sleep(1);
+               Assert.assertTrue(waitingWriter.isAlive());
+               mailbox.takeMail();
+               waitingWriter.join();
+       }
+
+       /**
+        * Test the producer-consumer pattern using the blocking methods on the 
mailbox.
+        */
+       @Test
+       public void testConcurrentPutTakeBlocking() throws Exception {
+               testPutTake(MailboxReceiver::takeMail, MailboxSender::putMail);
+       }
+
+       /**
+        * Test the producer-consumer pattern using the non-blocking methods & 
waits on the mailbox.
+        */
+       @Test
+       public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
+               testPutTake((mailbox -> {
+                               mailbox.waitUntilHasMail();
+                               return mailbox.tryTakeMail().get();
+                       }),
+                       ((mailbox, runnable) -> {
+                               while (!mailbox.tryPutMail(runnable)) {
+                                       mailbox.waitUntilHasCapacity();
+                               }
+                       }));
+       }
+
+       /**
+        * Test producer-consumer pattern through the mailbox in a concurrent 
setting (n-writer / 1-reader).
+        */
+       private void testPutTake(
+               FunctionWithException<Mailbox, Runnable, Exception> takeMethod,
+               BiConsumerWithException<Mailbox, Runnable, Exception> 
putMethod) throws Exception {
+               final int numThreads = 10;
+               final int numLettersPerThread = 1000;
+               final int[] results = new int[numThreads];
+               Thread[] writerThreads = new Thread[numThreads];
+               Thread readerThread = new Thread(ThrowingRunnable.unchecked(() 
-> {
+                       Runnable letter;
+                       while ((letter = takeMethod.apply(mailbox)) != 
POISON_LETTER) {
+                               letter.run();
+                       }
+               }));
+
+               readerThread.start();
+               for (int i = 0; i < writerThreads.length; ++i) {
+                       final int threadId = i;
+                       writerThreads[i] = new 
Thread(ThrowingRunnable.unchecked(() -> {
+                               for (int k = 0; k < numLettersPerThread; ++k) {
+                                       putMethod.accept(mailbox, () -> 
++results[threadId]);
+                               }
+                       }));
+               }
+
+               for (Thread writerThread : writerThreads) {
+                       writerThread.start();
+               }
+
+               for (Thread writerThread : writerThreads) {
+                       writerThread.join();
+               }
+
+               mailbox.putMail(POISON_LETTER);
+
+               readerThread.join();
+               for (int perThreadResult : results) {
+                       Assert.assertEquals(numLettersPerThread, 
perThreadResult);
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 230c68a..835d924 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -81,7 +81,9 @@ public class MockStreamTask extends StreamTask {
        public void init() { }
 
        @Override
-       protected void run() { }
+       protected void performDefaultAction(ActionContext context) throws 
Exception {
+               context.allActionsCompleted();
+       }
 
        @Override
        protected void cleanup() { }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 986e410..57a5121 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -284,13 +284,14 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void run() throws InterruptedException {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        final long taskIndex = 
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
                        if (taskIndex == 0) {
                                numberOfRestarts.countDown();
                        }
                        invokeLatch.countDown();
                        finishLatch.await();
+                       context.allActionsCompleted();
                }
 
                @Override
@@ -340,9 +341,10 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void run() throws InterruptedException {
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
                        invokeLatch.countDown();
                        finishLatch.await();
+                       context.allActionsCompleted();
                }
 
                @Override
@@ -368,8 +370,8 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void run() throws Exception {
-
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
+                       context.allActionsCompleted();
                }
 
                @Override

Reply via email to