[FLINK-3048] [tests] Increase stability of DataSinkTaskTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93622001 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93622001 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93622001 Branch: refs/heads/master Commit: 93622001e499fa04bb5c4a63b1b3ed09b270f5b9 Parents: ff52d28 Author: Stephan Ewen <se...@apache.org> Authored: Thu Nov 19 15:50:46 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Nov 19 15:50:46 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/operators/DataSinkTask.java | 2 +- .../runtime/operators/DataSinkTaskTest.java | 38 ++++++++++++-------- 2 files changed, 25 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index d20bb89..addceea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -257,7 +257,7 @@ public class DataSinkTask<IT> extends AbstractInvokable { } } - BatchTask.clearReaders(new MutableReader[]{inputReader}); + BatchTask.clearReaders(new MutableReader<?>[]{inputReader}); } if (!this.taskCanceled) { http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java index b741b64..6221706 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java @@ -31,12 +31,14 @@ import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; + import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,10 +51,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Set; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) -public class DataSinkTaskTest extends TaskTestBase -{ +public class DataSinkTaskTest extends TaskTestBase { + private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class); private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024; @@ -358,8 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase } @Test - public void testCancelDataSinkTask() { - + public void testCancelDataSinkTask() throws Exception { super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); super.addInput(new InfiniteInputIterator(), 0); @@ -382,19 +386,25 @@ public class DataSinkTaskTest extends TaskTestBase }; taskRunner.start(); - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, testTask); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); + File tempTestFile = new File(this.tempTestPath); + + // wait until the task created the file + long deadline = System.currentTimeMillis() + 60000; + while (!tempTestFile.exists() && System.currentTimeMillis() < deadline) { + Thread.sleep(10); } + assertTrue("Task did not create file within 60 seconds", tempTestFile.exists()); + + // cancel the task + Thread.sleep(500); + testTask.cancel(); + taskRunner.interrupt(); + + // wait for the canceling to complete + taskRunner.join(); // assert that temp file was created - File tempTestFile = new File(this.tempTestPath); - Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists()); + assertFalse("Temp output file has not been removed", tempTestFile.exists()); } @Test