This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 3d71518 [FLINK-13514] Fix instability in StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge 3d71518 is described below commit 3d71518ef9c96bc8fe0add3b4c25bf141aa599db Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Thu Aug 29 16:33:33 2019 +0200 [FLINK-13514] Fix instability in StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge Before, thread pool shutdown would interrupt our waiting method. Production code cannot throw an InterruptedException here and would also not be correct if one is thrown. We now swallow interrupted exceptions and wait until we successfully return from await(). --- .../flink/streaming/runtime/tasks/StreamTaskTest.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 2259501..d0295f1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -482,11 +482,23 @@ public class StreamTaskTest extends TestLogger { CheckpointResponder checkpointResponder = mock(CheckpointResponder.class); doAnswer(new Answer() { @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + public Object answer(InvocationOnMock invocation) { acknowledgeCheckpointLatch.trigger(); // block here so that we can issue the concurrent cancel call - completeAcknowledge.await(); + while (true) { + try { + // wait until we successfully await (no pun intended) + completeAcknowledge.await(); + + // when await() returns normally, we break out of the loop + break; + } catch (InterruptedException e) { + // survive interruptions that arise from thread pool shutdown + // production code cannot actually throw InterruptedException from + // checkpoint acknowledgement + } + } return null; }