[hotfix][tests] Reduce mockito usage in StreamTaskTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0af22bf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0af22bf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0af22bf2

Branch: refs/heads/master
Commit: 0af22bf284967c8f7e658b8eef3a91d407dbd8eb
Parents: 6b24757
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Wed Feb 7 16:16:29 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:24 2018 +0100

----------------------------------------------------------------------
 .../operators/testutils/MockEnvironment.java    | 44 +++++++++++++++-----
 .../operators/async/AsyncWaitOperatorTest.java  | 21 ++++------
 .../streaming/runtime/tasks/StreamTaskTest.java | 38 +++++------------
 .../util/AbstractStreamOperatorTestHarness.java |  2 +-
 4 files changed, 53 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index e28eada..4d1037e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -55,8 +56,11 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
@@ -73,7 +77,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
        private final IOManager ioManager;
 
-       private final TestTaskStateManager taskStateManager;
+       private final TaskStateManager taskStateManager;
 
        private final InputSplitProvider inputSplitProvider;
 
@@ -99,14 +103,25 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
        private final TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
 
-       private Throwable failExternallyCause;
+       private Optional<Class<Throwable>> expectedExternalFailureCause = 
Optional.empty();
+
+       private Optional<Throwable> actualExternalFailureCause = 
Optional.empty();
+
+       public MockEnvironment() {
+               this(
+                       "mock-task",
+                       1024 * MemoryManager.DEFAULT_PAGE_SIZE,
+                       null,
+                       16,
+                       new TestTaskStateManager());
+       }
 
        public MockEnvironment(
                String taskName,
                long memorySize,
                MockInputSplitProvider inputSplitProvider,
                int bufferSize,
-               TestTaskStateManager taskStateManager) {
+               TaskStateManager taskStateManager) {
                this(
                        taskName,
                        memorySize,
@@ -123,7 +138,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                MockInputSplitProvider inputSplitProvider,
                int bufferSize, Configuration taskConfiguration,
                ExecutionConfig executionConfig,
-               TestTaskStateManager taskStateManager) {
+               TaskStateManager taskStateManager) {
                this(
                        taskName,
                        memorySize,
@@ -144,7 +159,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                        int bufferSize,
                        Configuration taskConfiguration,
                        ExecutionConfig executionConfig,
-                       TestTaskStateManager taskStateManager,
+                       TaskStateManager taskStateManager,
                        int maxParallelism,
                        int parallelism,
                        int subtaskIndex) {
@@ -174,7 +189,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                        int parallelism,
                        int subtaskIndex,
                        ClassLoader userCodeClassLoader,
-                       TestTaskStateManager taskStateManager) {
+                       TaskStateManager taskStateManager) {
                this.taskInfo = new TaskInfo(taskName, maxParallelism, 
subtaskIndex, parallelism, 0);
                this.jobConfiguration = new Configuration();
                this.taskConfiguration = taskConfiguration;
@@ -324,7 +339,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
        }
 
        @Override
-       public TestTaskStateManager getTaskStateManager() {
+       public TaskStateManager getTaskStateManager() {
                return taskStateManager;
        }
 
@@ -355,7 +370,12 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
        @Override
        public void failExternally(Throwable cause) {
-               this.failExternallyCause = Preconditions.checkNotNull(cause, 
"Must give a cause fail fail.");
+               if (!expectedExternalFailureCause.isPresent()) {
+                       throw new 
UnsupportedOperationException("MockEnvironment does not support external task 
failure.");
+               }
+               
checkArgument(expectedExternalFailureCause.get().isInstance(checkNotNull(cause)));
+               checkState(!actualExternalFailureCause.isPresent());
+               actualExternalFailureCause = Optional.of(cause);
        }
 
        @Override
@@ -371,7 +391,11 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                checkState(ioManager.isProperlyShutDown(), "IO Manager has not 
properly shut down.");
        }
 
-       public Throwable getFailExternallyCause() {
-               return failExternallyCause;
+       public void setExpectedExternalFailureCause(Class<Throwable> 
expectedThrowableClass) {
+               this.expectedExternalFailureCause = 
Optional.of(expectedThrowableClass);
+       }
+
+       public Optional<Throwable> getActualExternalFailureCause() {
+               return actualExternalFailureCause;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index d24b55c..507ff0b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -80,7 +81,6 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
@@ -609,6 +609,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        AsyncDataStream.OutputMode.ORDERED);
 
                final MockEnvironment mockEnvironment = createMockEnvironment();
+               
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 
                final OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
                        new OneInputStreamOperatorTestHarness<>(operator, 
IntSerializer.INSTANCE, mockEnvironment);
@@ -643,14 +644,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
                ArgumentCaptor<Throwable> argumentCaptor = 
ArgumentCaptor.forClass(Throwable.class);
 
-               Throwable failureCause = 
mockEnvironment.getFailExternallyCause();
-               Assert.assertNotNull(failureCause);
-
-               Assert.assertNotNull(failureCause.getCause());
-               Assert.assertTrue(failureCause.getCause() instanceof 
ExecutionException);
-
-               Assert.assertNotNull(failureCause.getCause().getCause());
-               Assert.assertTrue(failureCause.getCause().getCause() instanceof 
TimeoutException);
+               
assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
+               
ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(),
 TimeoutException.class);
        }
 
        @Nonnull
@@ -730,8 +725,6 @@ public class AsyncWaitOperatorTest extends TestLogger {
                synchronized (lock) {
                        operator.close();
                }
-
-               Assert.assertNull(environment.getFailExternallyCause());
        }
 
        /**
@@ -867,6 +860,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        outputMode);
 
                final MockEnvironment mockEnvironment = createMockEnvironment();
+               
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 
                OneInputStreamOperatorTestHarness<Integer, Integer> harness = 
new OneInputStreamOperatorTestHarness<>(
                        asyncWaitOperator,
@@ -883,7 +877,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        harness.close();
                }
 
-               
Assert.assertNotNull(harness.getEnvironment().getFailExternallyCause());
+               
assertTrue(harness.getEnvironment().getActualExternalFailureCause().isPresent());
        }
 
        /**
@@ -932,6 +926,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        outputMode);
 
                final MockEnvironment mockEnvironment = createMockEnvironment();
+               
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 
                OneInputStreamOperatorTestHarness<Integer, Integer> harness = 
new OneInputStreamOperatorTestHarness<>(
                        asyncWaitOperator,
@@ -949,8 +944,6 @@ public class AsyncWaitOperatorTest extends TestLogger {
                synchronized (harness.getCheckpointLock()) {
                        harness.close();
                }
-
-               Assert.assertNotNull(mockEnvironment.getFailExternallyCause());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9ee35ee..52295fb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -330,8 +330,7 @@ public class StreamTaskTest extends TestLogger {
                TaskInfo mockTaskInfo = mock(TaskInfo.class);
                
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
                when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = mock(Environment.class);
-               when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+               Environment mockEnvironment = new MockEnvironment();
 
                StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
@@ -401,12 +400,7 @@ public class StreamTaskTest extends TestLogger {
                final long checkpointId = 42L;
                final long timestamp = 1L;
 
-               TaskInfo mockTaskInfo = mock(TaskInfo.class);
-               
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-               when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = mock(Environment.class);
-               when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
-
+               MockEnvironment mockEnvironment = new MockEnvironment();
                StreamTask<?, ?> streamTask = spy(new 
EmptyStreamTask(mockEnvironment));
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
 
@@ -458,6 +452,7 @@ public class StreamTaskTest extends TestLogger {
                        new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
                Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
 
+               
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
                streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                verify(streamTask).handleAsyncException(anyString(), 
any(Throwable.class));
@@ -483,12 +478,6 @@ public class StreamTaskTest extends TestLogger {
                final OneShotLatch acknowledgeCheckpointLatch = new 
OneShotLatch();
                final OneShotLatch completeAcknowledge = new OneShotLatch();
 
-               TaskInfo mockTaskInfo = mock(TaskInfo.class);
-               
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-               when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = mock(Environment.class);
-               when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
-
                CheckpointResponder checkpointResponder = 
mock(CheckpointResponder.class);
                doAnswer(new Answer() {
                        @Override
@@ -514,7 +503,12 @@ public class StreamTaskTest extends TestLogger {
                        null,
                        checkpointResponder);
 
-               
when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager);
+               MockEnvironment mockEnvironment = new MockEnvironment(
+                       "mock-task",
+                       1024 * MemoryManager.DEFAULT_PAGE_SIZE,
+                       null,
+                       16,
+                       taskStateManager);
 
                StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
@@ -606,11 +600,7 @@ public class StreamTaskTest extends TestLogger {
                final OneShotLatch createSubtask = new OneShotLatch();
                final OneShotLatch completeSubtask = new OneShotLatch();
 
-               TaskInfo mockTaskInfo = mock(TaskInfo.class);
-               
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-               when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = mock(Environment.class);
-               when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+               Environment mockEnvironment = spy(new MockEnvironment());
 
                whenNew(OperatorSubtaskState.class).
                        withArguments(
@@ -707,12 +697,7 @@ public class StreamTaskTest extends TestLogger {
                final long checkpointId = 42L;
                final long timestamp = 1L;
 
-               TaskInfo mockTaskInfo = mock(TaskInfo.class);
-
-               
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-               when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-
-               Environment mockEnvironment = mock(Environment.class);
+               Environment mockEnvironment = spy(new MockEnvironment());
 
                // latch blocks until the async checkpoint thread acknowledges
                final OneShotLatch checkpointCompletedLatch = new 
OneShotLatch();
@@ -742,7 +727,6 @@ public class StreamTaskTest extends TestLogger {
                        checkpointResponder);
 
                
when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager);
-               when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
 
                StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 966d205..ced22c0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -175,7 +175,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
 
                this.environment = Preconditions.checkNotNull(env);
 
-               this.taskStateManager = env.getTaskStateManager();
+               this.taskStateManager = (TestTaskStateManager) 
env.getTaskStateManager();
                this.internalEnvironment = environmentIsInternal ? 
Optional.of(environment) : Optional.empty();
 
                mockTask = mock(StreamTask.class);

Reply via email to