This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 3d71518  [FLINK-13514] Fix instability in 
StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge
3d71518 is described below

commit 3d71518ef9c96bc8fe0add3b4c25bf141aa599db
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Thu Aug 29 16:33:33 2019 +0200

    [FLINK-13514] Fix instability in 
StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge
    
    Before, thread pool shutdown would interrupt our waiting method.
    Production code cannot throw an InterruptedException here and would also
    not be correct if one is thrown.
    
    We now swallow interrupted exceptions and wait until we successfully
    return from await().
---
 .../flink/streaming/runtime/tasks/StreamTaskTest.java    | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 2259501..d0295f1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -482,11 +482,23 @@ public class StreamTaskTest extends TestLogger {
                CheckpointResponder checkpointResponder = 
mock(CheckpointResponder.class);
                doAnswer(new Answer() {
                        @Override
-                       public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                       public Object answer(InvocationOnMock invocation) {
                                acknowledgeCheckpointLatch.trigger();
 
                                // block here so that we can issue the 
concurrent cancel call
-                               completeAcknowledge.await();
+                               while (true) {
+                                       try {
+                                               // wait until we successfully 
await (no pun intended)
+                                               completeAcknowledge.await();
+
+                                               // when await() returns 
normally, we break out of the loop
+                                               break;
+                                       } catch (InterruptedException e) {
+                                               // survive interruptions that 
arise from thread pool shutdown
+                                               // production code cannot 
actually throw InterruptedException from
+                                               // checkpoint acknowledgement
+                                       }
+                               }
 
                                return null;
                        }

Reply via email to