[hotfix] [tests] Fix race condition in RescalingITCase that could make the test stuck in a blocking call until timeout
This closes #2513 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f67b54b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f67b54b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f67b54b Branch: refs/heads/master Commit: 5f67b54b2ca4f7ea79d184e65a99ef230dbdc660 Parents: 4d4eb64 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Sep 19 17:54:23 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Sep 21 17:53:33 2016 +0200 ---------------------------------------------------------------------- .../test/checkpointing/RescalingITCase.java | 51 ++++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5f67b54b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 7f1d7f3..263bf79 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -69,9 +69,9 @@ import static org.junit.Assert.fail; public class RescalingITCase extends TestLogger { - private static int numTaskManagers = 2; - private static int slotsPerTaskManager = 2; - private static int numSlots = numTaskManagers * slotsPerTaskManager; + private static final int numTaskManagers = 2; + private static final int slotsPerTaskManager = 2; + private static final int numSlots = numTaskManagers * slotsPerTaskManager; private static TestingCluster cluster; @@ -109,12 +109,12 @@ public class RescalingITCase extends TestLogger { */ @Test public void testSavepointRescalingWithPartitionedState() throws Exception { - int numberKeys = 42; - int numberElements = 1000; - int numberElements2 = 500; - int parallelism = numSlots / 2; - int parallelism2 = numSlots; - int maxParallelism = 13; + final int numberKeys = 42; + final int numberElements = 1000; + final int numberElements2 = 500; + final int parallelism = numSlots / 2; + final int parallelism2 = numSlots; + final int maxParallelism = 13; FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES); Deadline deadline = timeout.fromNow(); @@ -214,9 +214,9 @@ public class RescalingITCase extends TestLogger { */ @Test public void testSavepointRescalingFailureWithNonPartitionedState() throws Exception { - int parallelism = numSlots / 2; - int parallelism2 = numSlots; - int maxParallelism = 13; + final int parallelism = numSlots / 2; + final int parallelism2 = numSlots; + final int maxParallelism = 13; FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES); Deadline deadline = timeout.fromNow(); @@ -235,12 +235,14 @@ public class RescalingITCase extends TestLogger { Object savepointResponse = null; - // we might be too early for taking a savepoint if the operators have not been started yet + // wait until the operator is started + NonPartitionedStateSource.workStartedLatch.await(); + while (deadline.hasTimeLeft()) { Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); - - savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft()); + FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); + savepointResponse = Await.result(savepointPathFuture, waitingTime); if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { break; @@ -428,6 +430,8 @@ public class RescalingITCase extends TestLogger { env.enableCheckpointing(checkpointInterval); env.setRestartStrategy(RestartStrategies.noRestart()); + NonPartitionedStateSource.workStartedLatch = new CountDownLatch(1); + DataStream<Integer> input = env.addSource(new NonPartitionedStateSource()); input.addSink(new DiscardingSink<Integer>()); @@ -466,7 +470,7 @@ public class RescalingITCase extends TestLogger { DataStream<Tuple2<Integer, Integer>> result = input.flatMap(new SubtaskIndexFlatMapper(numberElements)); - result.addSink(new CollectionSink()); + result.addSink(new CollectionSink<Tuple2<Integer, Integer>>()); return env.getStreamGraph().getJobGraph(); } @@ -504,7 +508,7 @@ public class RescalingITCase extends TestLogger { DataStream<Tuple2<Integer, Integer>> result = input.flatMap(new SubtaskIndexFlatMapper(numberElements)); - result.addSink(new CollectionSink()); + result.addSink(new CollectionSink<Tuple2<Integer, Integer>>()); return env.getStreamGraph().getJobGraph(); } @@ -645,8 +649,10 @@ public class RescalingITCase extends TestLogger { private static final long serialVersionUID = -8108185918123186841L; - private int counter = 0; - private boolean running = true; + private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1); + + private volatile int counter = 0; + private volatile boolean running = true; @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { @@ -669,13 +675,16 @@ public class RescalingITCase extends TestLogger { ctx.collect(counter * getRuntimeContext().getIndexOfThisSubtask()); } - Thread.sleep(100); + Thread.sleep(2); + if(counter == 10) { + workStartedLatch.countDown(); + } } } @Override public void cancel() { - running = true; + running = false; } } }