[hotfix] [tests] Stabilize SystemProcessingTimeServiceTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c962c45e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c962c45e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c962c45e

Branch: refs/heads/master
Commit: c962c45e634155795ca7548afb4fb24f678a6a03
Parents: 52f8b33
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 19 18:01:29 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Apr 20 10:52:37 2017 +0200

----------------------------------------------------------------------
 .../tasks/SystemProcessingTimeServiceTest.java        | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c962c45e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 50e438c..fb4f087 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -178,7 +179,16 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
                        // this should cancel our future
                        timer.quiesceAndAwaitPending();
 
-                       assertTrue(scheduledFuture.isCancelled());
+                       // it may be that the cancelled status is not 
immediately visible after the
+                       // termination (not necessary a volatile update), so we 
need to "get()" the cancellation
+                       // exception to be on the safe side
+                       try {
+                               scheduledFuture.get();
+                               fail("scheduled future is not cancelled");
+                       }
+                       catch (CancellationException ignored) {
+                               // expected
+                       }
 
                        scheduledFuture = timer.scheduleAtFixedRate(new 
ProcessingTimeCallback() {
                                @Override

Reply via email to