[hotfix] [tests] Stabilize SystemProcessingTimeServiceTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c962c45e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c962c45e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c962c45e Branch: refs/heads/master Commit: c962c45e634155795ca7548afb4fb24f678a6a03 Parents: 52f8b33 Author: Stephan Ewen <se...@apache.org> Authored: Wed Apr 19 18:01:29 2017 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Apr 20 10:52:37 2017 +0200 ---------------------------------------------------------------------- .../tasks/SystemProcessingTimeServiceTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c962c45e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 50e438c..fb4f087 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -20,10 +20,11 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; - import org.apache.flink.util.TestLogger; + import org.junit.Test; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -178,7 +179,16 @@ public class SystemProcessingTimeServiceTest extends TestLogger { // this should cancel our future timer.quiesceAndAwaitPending(); - assertTrue(scheduledFuture.isCancelled()); + // it may be that the cancelled status is not immediately visible after the + // termination (not necessary a volatile update), so we need to "get()" the cancellation + // exception to be on the safe side + try { + scheduledFuture.get(); + fail("scheduled future is not cancelled"); + } + catch (CancellationException ignored) { + // expected + } scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() { @Override