[hotfix][tests] Reduce mockito usage in StreamTaskTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0af22bf2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0af22bf2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0af22bf2 Branch: refs/heads/master Commit: 0af22bf284967c8f7e658b8eef3a91d407dbd8eb Parents: 6b24757 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Wed Feb 7 16:16:29 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:24 2018 +0100 ---------------------------------------------------------------------- .../operators/testutils/MockEnvironment.java | 44 +++++++++++++++----- .../operators/async/AsyncWaitOperatorTest.java | 21 ++++------ .../streaming/runtime/tasks/StreamTaskTest.java | 38 +++++------------ .../util/AbstractStreamOperatorTestHarness.java | 2 +- 4 files changed, 53 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index e28eada..4d1037e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; @@ -55,8 +56,11 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Future; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.fail; @@ -73,7 +77,7 @@ public class MockEnvironment implements Environment, AutoCloseable { private final IOManager ioManager; - private final TestTaskStateManager taskStateManager; + private final TaskStateManager taskStateManager; private final InputSplitProvider inputSplitProvider; @@ -99,14 +103,25 @@ public class MockEnvironment implements Environment, AutoCloseable { private final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); - private Throwable failExternallyCause; + private Optional<Class<Throwable>> expectedExternalFailureCause = Optional.empty(); + + private Optional<Throwable> actualExternalFailureCause = Optional.empty(); + + public MockEnvironment() { + this( + "mock-task", + 1024 * MemoryManager.DEFAULT_PAGE_SIZE, + null, + 16, + new TestTaskStateManager()); + } public MockEnvironment( String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, - TestTaskStateManager taskStateManager) { + TaskStateManager taskStateManager) { this( taskName, memorySize, @@ -123,7 +138,7 @@ public class MockEnvironment implements Environment, AutoCloseable { MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration, ExecutionConfig executionConfig, - TestTaskStateManager taskStateManager) { + TaskStateManager taskStateManager) { this( taskName, memorySize, @@ -144,7 +159,7 @@ public class MockEnvironment implements Environment, AutoCloseable { int bufferSize, Configuration taskConfiguration, ExecutionConfig executionConfig, - TestTaskStateManager taskStateManager, + TaskStateManager taskStateManager, int maxParallelism, int parallelism, int subtaskIndex) { @@ -174,7 +189,7 @@ public class MockEnvironment implements Environment, AutoCloseable { int parallelism, int subtaskIndex, ClassLoader userCodeClassLoader, - TestTaskStateManager taskStateManager) { + TaskStateManager taskStateManager) { this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0); this.jobConfiguration = new Configuration(); this.taskConfiguration = taskConfiguration; @@ -324,7 +339,7 @@ public class MockEnvironment implements Environment, AutoCloseable { } @Override - public TestTaskStateManager getTaskStateManager() { + public TaskStateManager getTaskStateManager() { return taskStateManager; } @@ -355,7 +370,12 @@ public class MockEnvironment implements Environment, AutoCloseable { @Override public void failExternally(Throwable cause) { - this.failExternallyCause = Preconditions.checkNotNull(cause, "Must give a cause fail fail."); + if (!expectedExternalFailureCause.isPresent()) { + throw new UnsupportedOperationException("MockEnvironment does not support external task failure."); + } + checkArgument(expectedExternalFailureCause.get().isInstance(checkNotNull(cause))); + checkState(!actualExternalFailureCause.isPresent()); + actualExternalFailureCause = Optional.of(cause); } @Override @@ -371,7 +391,11 @@ public class MockEnvironment implements Environment, AutoCloseable { checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down."); } - public Throwable getFailExternallyCause() { - return failExternallyCause; + public void setExpectedExternalFailureCause(Class<Throwable> expectedThrowableClass) { + this.expectedExternalFailureCause = Optional.of(expectedThrowableClass); + } + + public Optional<Throwable> getActualExternalFailureCause() { + return actualExternalFailureCause; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index d24b55c..507ff0b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -80,7 +81,6 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; @@ -609,6 +609,7 @@ public class AsyncWaitOperatorTest extends TestLogger { AsyncDataStream.OutputMode.ORDERED); final MockEnvironment mockEnvironment = createMockEnvironment(); + mockEnvironment.setExpectedExternalFailureCause(Throwable.class); final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment); @@ -643,14 +644,8 @@ public class AsyncWaitOperatorTest extends TestLogger { ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class); - Throwable failureCause = mockEnvironment.getFailExternallyCause(); - Assert.assertNotNull(failureCause); - - Assert.assertNotNull(failureCause.getCause()); - Assert.assertTrue(failureCause.getCause() instanceof ExecutionException); - - Assert.assertNotNull(failureCause.getCause().getCause()); - Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException); + assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent()); + ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } @Nonnull @@ -730,8 +725,6 @@ public class AsyncWaitOperatorTest extends TestLogger { synchronized (lock) { operator.close(); } - - Assert.assertNull(environment.getFailExternallyCause()); } /** @@ -867,6 +860,7 @@ public class AsyncWaitOperatorTest extends TestLogger { outputMode); final MockEnvironment mockEnvironment = createMockEnvironment(); + mockEnvironment.setExpectedExternalFailureCause(Throwable.class); OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>( asyncWaitOperator, @@ -883,7 +877,7 @@ public class AsyncWaitOperatorTest extends TestLogger { harness.close(); } - Assert.assertNotNull(harness.getEnvironment().getFailExternallyCause()); + assertTrue(harness.getEnvironment().getActualExternalFailureCause().isPresent()); } /** @@ -932,6 +926,7 @@ public class AsyncWaitOperatorTest extends TestLogger { outputMode); final MockEnvironment mockEnvironment = createMockEnvironment(); + mockEnvironment.setExpectedExternalFailureCause(Throwable.class); OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>( asyncWaitOperator, @@ -949,8 +944,6 @@ public class AsyncWaitOperatorTest extends TestLogger { synchronized (harness.getCheckpointLock()) { harness.close(); } - - Assert.assertNotNull(mockEnvironment.getFailExternallyCause()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 9ee35ee..52295fb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -330,8 +330,7 @@ public class StreamTaskTest extends TestLogger { TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = mock(Environment.class); - when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + Environment mockEnvironment = new MockEnvironment(); StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -401,12 +400,7 @@ public class StreamTaskTest extends TestLogger { final long checkpointId = 42L; final long timestamp = 1L; - TaskInfo mockTaskInfo = mock(TaskInfo.class); - when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); - when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = mock(Environment.class); - when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); - + MockEnvironment mockEnvironment = new MockEnvironment(); StreamTask<?, ?> streamTask = spy(new EmptyStreamTask(mockEnvironment)); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -458,6 +452,7 @@ public class StreamTaskTest extends TestLogger { new StreamTask.AsyncCheckpointExceptionHandler(streamTask); Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler); + mockEnvironment.setExpectedExternalFailureCause(Throwable.class); streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation()); verify(streamTask).handleAsyncException(anyString(), any(Throwable.class)); @@ -483,12 +478,6 @@ public class StreamTaskTest extends TestLogger { final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch(); final OneShotLatch completeAcknowledge = new OneShotLatch(); - TaskInfo mockTaskInfo = mock(TaskInfo.class); - when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); - when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = mock(Environment.class); - when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); - CheckpointResponder checkpointResponder = mock(CheckpointResponder.class); doAnswer(new Answer() { @Override @@ -514,7 +503,12 @@ public class StreamTaskTest extends TestLogger { null, checkpointResponder); - when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager); + MockEnvironment mockEnvironment = new MockEnvironment( + "mock-task", + 1024 * MemoryManager.DEFAULT_PAGE_SIZE, + null, + 16, + taskStateManager); StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -606,11 +600,7 @@ public class StreamTaskTest extends TestLogger { final OneShotLatch createSubtask = new OneShotLatch(); final OneShotLatch completeSubtask = new OneShotLatch(); - TaskInfo mockTaskInfo = mock(TaskInfo.class); - when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); - when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = mock(Environment.class); - when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + Environment mockEnvironment = spy(new MockEnvironment()); whenNew(OperatorSubtaskState.class). withArguments( @@ -707,12 +697,7 @@ public class StreamTaskTest extends TestLogger { final long checkpointId = 42L; final long timestamp = 1L; - TaskInfo mockTaskInfo = mock(TaskInfo.class); - - when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); - when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - - Environment mockEnvironment = mock(Environment.class); + Environment mockEnvironment = spy(new MockEnvironment()); // latch blocks until the async checkpoint thread acknowledges final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); @@ -742,7 +727,6 @@ public class StreamTaskTest extends TestLogger { checkpointResponder); when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager); - when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 966d205..ced22c0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -175,7 +175,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { this.environment = Preconditions.checkNotNull(env); - this.taskStateManager = env.getTaskStateManager(); + this.taskStateManager = (TestTaskStateManager) env.getTaskStateManager(); this.internalEnvironment = environmentIsInternal ? Optional.of(environment) : Optional.empty(); mockTask = mock(StreamTask.class);