[FLINK-5800] [checkpointing] Create CheckpointSteamFactory only once per 
operator

Previously, the factory was created once per checkpoint, and its repeated 
initialization logic
(like ensuring existence of base paths) caused heavy load on some filesystems 
at very large scale.

This closes #3312


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04e6758a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04e6758a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04e6758a

Branch: refs/heads/master
Commit: 04e6758abbadf39a12848a925e6e087e060bbe3a
Parents: d062448
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 18:35:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 15:08:21 2017 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   | 44 ++++++++++--
 .../operators/StreamCheckpointedOperator.java   |  4 ++
 .../streaming/api/operators/StreamOperator.java | 26 ++++---
 .../streaming/runtime/tasks/StreamTask.java     | 48 ++-----------
 .../operators/AbstractStreamOperatorTest.java   | 18 +++--
 .../AbstractUdfStreamOperatorLifecycleTest.java | 15 +++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 73 ++++++++------------
 .../util/AbstractStreamOperatorTestHarness.java | 16 ++---
 .../streaming/runtime/StateBackendITCase.java   |  5 +-
 9 files changed, 122 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7a3e2ce..144247f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -114,6 +114,10 @@ public abstract class AbstractStreamOperator<OUT>
        /** The runtime context for UDFs */
        private transient StreamingRuntimeContext runtimeContext;
 
+       // ----------------- general state -------------------
+
+       /** The factory that give this operator access to checkpoint storage */
+       private transient CheckpointStreamFactory checkpointStreamFactory;
 
        // ---------------- key/value state ------------------
 
@@ -127,10 +131,11 @@ public abstract class AbstractStreamOperator<OUT>
        /** Keyed state store view on the keyed backend */
        private transient DefaultKeyedStateStore keyedStateStore;
 
+       // ---------------- operator state ------------------
+
        /** Operator state backend / store */
        private transient OperatorStateBackend operatorStateBackend;
 
-
        // --------------- Metrics ---------------------------
 
        /** Metric group for the operator */
@@ -212,6 +217,8 @@ public abstract class AbstractStreamOperator<OUT>
                        }
                }
 
+               checkpointStreamFactory = 
container.createCheckpointStreamFactory(this);
+
                initOperatorState(operatorStateHandlesBackend);
 
                StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
@@ -333,8 +340,7 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @Override
-       public final OperatorSnapshotResult snapshotState(
-                       long checkpointId, long timestamp, 
CheckpointStreamFactory streamFactory) throws Exception {
+       public final OperatorSnapshotResult snapshotState(long checkpointId, 
long timestamp) throws Exception {
 
                KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                                keyedStateBackend.getKeyGroupRange() : 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
@@ -344,7 +350,7 @@ public abstract class AbstractStreamOperator<OUT>
                try (StateSnapshotContextSynchronousImpl snapshotContext = new 
StateSnapshotContextSynchronousImpl(
                                checkpointId,
                                timestamp,
-                               streamFactory,
+                               checkpointStreamFactory,
                                keyGroupRange,
                                getContainingTask().getCancelables())) {
 
@@ -355,14 +361,14 @@ public abstract class AbstractStreamOperator<OUT>
 
                        if (null != operatorStateBackend) {
                                
snapshotInProgress.setOperatorStateManagedFuture(
-                                       
operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+                                       
operatorStateBackend.snapshot(checkpointId, timestamp, 
checkpointStreamFactory));
                        }
 
                        if (null != keyedStateBackend) {
                                snapshotInProgress.setKeyedStateManagedFuture(
-                                       
keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+                                       
keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
                        }
-               }  catch (Exception snapshotException) {
+               } catch (Exception snapshotException) {
                        try {
                                snapshotInProgress.cancel();
                        } catch (Exception e) {
@@ -422,6 +428,30 @@ public abstract class AbstractStreamOperator<OUT>
                }
        }
 
+       @SuppressWarnings("deprecation")
+       @Deprecated
+       @Override
+       public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, 
long timestamp) throws Exception {
+               if (this instanceof StreamCheckpointedOperator) {
+
+                       final 
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+                                       
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, 
timestamp);
+
+                       
getContainingTask().getCancelables().registerClosable(outStream);
+
+                       try {
+                               ((StreamCheckpointedOperator) 
this).snapshotState(outStream, checkpointId, timestamp);
+                               return outStream.closeAndGetHandle();
+                       }
+                       finally {
+                               
getContainingTask().getCancelables().unregisterClosable(outStream);
+                               outStream.close();
+                       }
+               } else {
+                       return null;
+               }
+       }
+
        /**
         * Stream operators with state which can be restored need to override 
this hook method.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index f93e7ea..a28cdc4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+/**
+ * This interface is deprecated without replacement.
+ * All operators are now checkpointed.
+ */
 @Deprecated
 public interface StreamCheckpointedOperator extends 
CheckpointedRestoringOperator {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f6e5472..d8e4d08 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -93,18 +93,24 @@ public interface StreamOperator<OUT> extends Serializable {
        /**
         * Called to draw a state snapshot from the operator.
         *
-        * @throws Exception Forwards exceptions that occur while preparing for 
the snapshot
-        */
-
-       /**
-        * Called to draw a state snapshot from the operator.
-        *
         * @return a runnable future to the state handle that points to the 
snapshotted state. For synchronous implementations,
         * the runnable might already be finished.
+        * 
         * @throws Exception exception that happened during snapshotting.
         */
-       OperatorSnapshotResult snapshotState(
-                       long checkpointId, long timestamp, 
CheckpointStreamFactory streamFactory) throws Exception;
+       OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) 
throws Exception;
+
+       /**
+        * Takes a snapshot of the legacy operator state defined via {@link 
StreamCheckpointedOperator}.
+        * 
+        * @return The handle to the legacy operator state, or null, if no 
state was snapshotted.
+        * @throws Exception This method should forward any type of exception 
that happens during snapshotting.
+        * 
+        * @deprecated This method will be removed as soon as no more operators 
use the legacy state code paths
+        */
+       @SuppressWarnings("deprecation")
+       @Deprecated
+       StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long 
timestamp) throws Exception;
 
        /**
         * Provides state handles to restore the operator state.
@@ -134,6 +140,6 @@ public interface StreamOperator<OUT> extends Serializable {
        ChainingStrategy getChainingStrategy();
 
        void setChainingStrategy(ChainingStrategy strategy);
-       
+
        MetricGroup getMetricGroup();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3781cb6..6b33d12 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -53,7 +53,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -1045,8 +1044,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                // ------------------------
 
-               private CheckpointStreamFactory streamFactory;
-
                private final List<StreamStateHandle> nonPartitionedStates;
                private final List<OperatorSnapshotResult> 
snapshotInProgressList;
 
@@ -1125,15 +1122,17 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
 
+               @SuppressWarnings("deprecation")
                private void checkpointStreamOperator(StreamOperator<?> op) 
throws Exception {
                        if (null != op) {
-                               createStreamFactory(op);
-                               snapshotNonPartitionableState(op);
+                               // first call the legacy checkpoint code paths 
+                               
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
+                                               
checkpointMetaData.getCheckpointId(),
+                                               
checkpointMetaData.getTimestamp()));
 
                                OperatorSnapshotResult snapshotInProgress = 
op.snapshotState(
                                                
checkpointMetaData.getCheckpointId(),
-                                               
checkpointMetaData.getTimestamp(),
-                                               streamFactory);
+                                               
checkpointMetaData.getTimestamp());
 
                                snapshotInProgressList.add(snapshotInProgress);
                        } else {
@@ -1143,41 +1142,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
 
-               private void createStreamFactory(StreamOperator<?> operator) 
throws IOException {
-                       String operatorId = 
owner.createOperatorIdentifier(operator, owner.configuration.getVertexID());
-                       this.streamFactory = 
owner.stateBackend.createStreamFactory(owner.getEnvironment().getJobID(), 
operatorId);
-               }
-
-               //TODO deprecated code path
-               private void snapshotNonPartitionableState(StreamOperator<?> 
operator) throws Exception {
-
-                       StreamStateHandle stateHandle = null;
-
-                       if (operator instanceof StreamCheckpointedOperator) {
-
-                               
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-                                               
streamFactory.createCheckpointStateOutputStream(
-                                                               
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-                               owner.cancelables.registerClosable(outStream);
-
-                               try {
-                                       ((StreamCheckpointedOperator) operator).
-                                                       snapshotState(
-                                                                       
outStream,
-                                                                       
checkpointMetaData.getCheckpointId(),
-                                                                       
checkpointMetaData.getTimestamp());
-
-                                       stateHandle = 
outStream.closeAndGetHandle();
-                               } finally {
-                                       
owner.cancelables.unregisterClosable(outStream);
-                                       outStream.close();
-                               }
-                       }
-
-                       nonPartitionedStates.add(stateHandle);
-               }
-
                public void runAsyncCheckpointingAndAcknowledge() throws 
IOException {
 
                        AsyncCheckpointRunnable asyncCheckpointRunnable = new 
AsyncCheckpointRunnable(

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 409a732..274611a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -55,7 +55,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -491,16 +490,15 @@ public class AbstractStreamOperatorTest {
                StateSnapshotContextSynchronousImpl context = 
mock(StateSnapshotContextSynchronousImpl.class);
 
                
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
-
-               CheckpointStreamFactory streamFactory = 
mock(CheckpointStreamFactory.class);
+               
                StreamTask<Void, AbstractStreamOperator<Void>> containingTask = 
mock(StreamTask.class);
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
                AbstractStreamOperator<Void> operator = 
mock(AbstractStreamOperator.class);
-               when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenCallRealMethod();
+               when(operator.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
                doReturn(containingTask).when(operator).getContainingTask();
 
-               operator.snapshotState(checkpointId, timestamp, streamFactory);
+               operator.snapshotState(checkpointId, timestamp);
 
                verify(context).close();
        }
@@ -522,19 +520,18 @@ public class AbstractStreamOperatorTest {
 
                
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 
-               CheckpointStreamFactory streamFactory = 
mock(CheckpointStreamFactory.class);
                StreamTask<Void, AbstractStreamOperator<Void>> containingTask = 
mock(StreamTask.class);
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
                AbstractStreamOperator<Void> operator = 
mock(AbstractStreamOperator.class);
-               when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenCallRealMethod();
+               when(operator.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
                doReturn(containingTask).when(operator).getContainingTask();
 
                // lets fail when calling the actual snapshotState method
                
doThrow(failingException).when(operator).snapshotState(eq(context));
 
                try {
-                       operator.snapshotState(checkpointId, timestamp, 
streamFactory);
+                       operator.snapshotState(checkpointId, timestamp);
                        fail("Exception expected.");
                } catch (Exception e) {
                        assertEquals(failingException, e.getCause());
@@ -574,7 +571,7 @@ public class AbstractStreamOperatorTest {
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
                AbstractStreamOperator<Void> operator = 
mock(AbstractStreamOperator.class);
-               when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenCallRealMethod();
+               when(operator.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
                doReturn(containingTask).when(operator).getContainingTask();
 
                RunnableFuture<OperatorStateHandle> 
futureManagedOperatorStateHandle = mock(RunnableFuture.class);
@@ -587,9 +584,10 @@ public class AbstractStreamOperatorTest {
 
                Whitebox.setInternalState(operator, "operatorStateBackend", 
operatorStateBackend);
                Whitebox.setInternalState(operator, "keyedStateBackend", 
keyedStateBackend);
+               Whitebox.setInternalState(operator, "checkpointStreamFactory", 
streamFactory);
 
                try {
-                       operator.snapshotState(checkpointId, timestamp, 
streamFactory);
+                       operator.snapshotState(checkpointId, timestamp);
                        fail("Exception expected.");
                } catch (Exception e) {
                        assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 965aec6..357163c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -60,6 +61,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "UDF::open",
                        "OPERATOR::run",
                        "UDF::run",
+                       "OPERATOR::snapshotLegacyOperatorState",
                        "OPERATOR::snapshotState",
                        "OPERATOR::close",
                        "UDF::close",
@@ -86,8 +88,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "setKeyContextElement2[class 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
                        "setup[class 
org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
                        "org.apache.flink.streaming.api.graph.StreamConfig, 
interface " +
-                       "org.apache.flink.streaming.api.operators.Output], 
snapshotState[long, long, " +
-                       "interface 
org.apache.flink.runtime.state.CheckpointStreamFactory]]";
+                       "org.apache.flink.streaming.api.operators.Output], " +
+                       "snapshotLegacyOperatorState[long, long], " +
+                       "snapshotState[long, long]]";
 
        private static final String ALL_METHODS_RICH_FUNCTION = "[close[], 
getIterationRuntimeContext[], getRuntimeContext[]" +
                        ", open[class 
org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -198,7 +201,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
        }
 
        private static class LifecycleTrackingStreamSource<OUT, SRC extends 
SourceFunction<OUT>>
-                       extends StreamSource<OUT, SRC> implements Serializable {
+                       extends StreamSource<OUT, SRC> implements Serializable, 
StreamCheckpointedOperator {
 
                private static final long serialVersionUID = 
2431488948886850562L;
                private transient Thread testCheckpointer;
@@ -254,6 +257,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                }
 
                @Override
+               public StreamStateHandle snapshotLegacyOperatorState(long 
checkpointId, long timestamp) throws Exception {
+                       
ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
+                       return super.snapshotLegacyOperatorState(checkpointId, 
timestamp);
+               }
+
+               @Override
                public void initializeState(StateInitializationContext context) 
throws Exception {
                        ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
                        super.initializeState(context);

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index cb9850f..6a63ee8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -95,6 +95,7 @@ import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import scala.concurrent.Await;
@@ -135,6 +136,8 @@ import static 
org.powermock.api.mockito.PowerMockito.whenNew;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StreamTask.class)
+@PowerMockIgnore("org.apache.log4j.*")
+@SuppressWarnings("deprecation")
 public class StreamTaskTest extends TestLogger {
 
        private static OneShotLatch SYNC_LATCH;
@@ -289,49 +292,42 @@ public class StreamTaskTest extends TestLogger {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
                streamTask.setEnvironment(mockEnvironment);
 
+               // mock the operators
                StreamOperator<?> streamOperator1 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
                StreamOperator<?> streamOperator2 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
                StreamOperator<?> streamOperator3 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
+               // mock the returned snapshots
                OperatorSnapshotResult operatorSnapshotResult1 = 
mock(OperatorSnapshotResult.class);
                OperatorSnapshotResult operatorSnapshotResult2 = 
mock(OperatorSnapshotResult.class);
 
                final Exception testException = new Exception("Test exception");
 
-               when(streamOperator1.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-               when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-               when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenThrow(testException);
-
-               StreamOperator<?>[] streamOperators = {streamOperator1, 
streamOperator2, streamOperator3};
-
-               OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
-               
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+               when(streamOperator1.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult1);
+               when(streamOperator2.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult2);
+               when(streamOperator3.snapshotState(anyLong(), 
anyLong())).thenThrow(testException);
 
+               // mock the returned legacy snapshots
                StreamStateHandle streamStateHandle1 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle2 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle3 = 
mock(StreamStateHandle.class);
 
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream1 
= mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream2 
= mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream3 
= mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle1);
+               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle2);
+               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle3);
 
-               
when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
-               
when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
-               
when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+               // set up the task
 
-               CheckpointStreamFactory mockStreamFactory = 
mock(CheckpointStreamFactory.class);
-               
when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), 
anyLong())).thenReturn(
-                       outStream1, outStream2, outStream3);
+               StreamOperator<?>[] streamOperators = {streamOperator1, 
streamOperator2, streamOperator3};
 
-               AbstractStateBackend mockStateBackend = 
mock(AbstractStateBackend.class);
-               when(mockStateBackend.createStreamFactory(any(JobID.class), 
anyString())).thenReturn(mockStreamFactory);
+               OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
+               
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
 
                Whitebox.setInternalState(streamTask, "isRunning", true);
                Whitebox.setInternalState(streamTask, "lock", new Object());
                Whitebox.setInternalState(streamTask, "operatorChain", 
operatorChain);
                Whitebox.setInternalState(streamTask, "cancelables", new 
CloseableRegistry());
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
-               Whitebox.setInternalState(streamTask, "stateBackend", 
mockStateBackend);
 
                try {
                        streamTask.triggerCheckpoint(checkpointMetaData);
@@ -371,6 +367,8 @@ public class StreamTaskTest extends TestLogger {
                StreamOperator<?> streamOperator2 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
                StreamOperator<?> streamOperator3 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
+               // mock the new state handles / futures
+
                OperatorSnapshotResult operatorSnapshotResult1 = 
mock(OperatorSnapshotResult.class);
                OperatorSnapshotResult operatorSnapshotResult2 = 
mock(OperatorSnapshotResult.class);
                OperatorSnapshotResult operatorSnapshotResult3 = 
mock(OperatorSnapshotResult.class);
@@ -380,33 +378,23 @@ public class StreamTaskTest extends TestLogger {
 
                
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
-               when(streamOperator1.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-               when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-               when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
-
-               StreamOperator<?>[] streamOperators = {streamOperator1, 
streamOperator2, streamOperator3};
-
-               OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
-               
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+               when(streamOperator1.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult1);
+               when(streamOperator2.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult2);
+               when(streamOperator3.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult3);
 
+               // mock the legacy state snapshot
                StreamStateHandle streamStateHandle1 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle2 = 
mock(StreamStateHandle.class);
                StreamStateHandle streamStateHandle3 = 
mock(StreamStateHandle.class);
 
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream1 
= mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream2 
= mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream3 
= mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-
-               
when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
-               
when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
-               
when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle1);
+               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle2);
+               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong())).thenReturn(streamStateHandle3);
 
-               CheckpointStreamFactory mockStreamFactory = 
mock(CheckpointStreamFactory.class);
-               
when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), 
anyLong())).thenReturn(
-                       outStream1, outStream2, outStream3);
+               StreamOperator<?>[] streamOperators = {streamOperator1, 
streamOperator2, streamOperator3};
 
-               AbstractStateBackend mockStateBackend = 
mock(AbstractStateBackend.class);
-               when(mockStateBackend.createStreamFactory(any(JobID.class), 
anyString())).thenReturn(mockStreamFactory);
+               OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
+               
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
 
                Whitebox.setInternalState(streamTask, "isRunning", true);
                Whitebox.setInternalState(streamTask, "lock", new Object());
@@ -414,7 +402,6 @@ public class StreamTaskTest extends TestLogger {
                Whitebox.setInternalState(streamTask, "cancelables", new 
CloseableRegistry());
                Whitebox.setInternalState(streamTask, 
"asyncOperationsThreadPool", new DirectExecutorService());
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
-               Whitebox.setInternalState(streamTask, "stateBackend", 
mockStateBackend);
 
                streamTask.triggerCheckpoint(checkpointMetaData);
 
@@ -479,7 +466,7 @@ public class StreamTaskTest extends TestLogger {
                        new DoneFuture<>(managedOperatorStateHandle),
                        new DoneFuture<>(rawOperatorStateHandle));
 
-               when(streamOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+               when(streamOperator.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult);
 
                StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -595,7 +582,7 @@ public class StreamTaskTest extends TestLogger {
                        new DoneFuture<>(managedOperatorStateHandle),
                        new DoneFuture<>(rawOperatorStateHandle));
 
-               when(streamOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+               when(streamOperator.snapshotState(anyLong(), 
anyLong())).thenReturn(operatorSnapshotResult);
 
                StreamOperator<?>[] streamOperators = {streamOperator};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 568410a..2df4efd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -459,15 +459,11 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        }
 
        /**
-        * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
+        * Calls {@link StreamOperator#snapshotState(long, long)}.
         */
        public OperatorStateHandles snapshot(long checkpointId, long timestamp) 
throws Exception {
 
-               CheckpointStreamFactory streamFactory = 
stateBackend.createStreamFactory(
-                               new JobID(),
-                               "test_op");
-
-               OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(checkpointId, timestamp, streamFactory);
+               OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(checkpointId, timestamp);
 
                KeyGroupsStateHandle keyedManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
                KeyGroupsStateHandle keyedRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
@@ -475,14 +471,13 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                OperatorStateHandle opManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
                OperatorStateHandle opRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-               OperatorStateHandles handles = new OperatorStateHandles(
+               return new OperatorStateHandles(
                        0,
                        null,
                        keyedManaged != null ? 
Collections.singletonList(keyedManaged) : null,
                        keyedRaw != null ? Collections.singletonList(keyedRaw) 
: null,
                        opManaged != null ? 
Collections.singletonList(opManaged) : null,
                        opRaw != null ? Collections.singletonList(opRaw) : 
null);
-               return handles;
        }
 
        /**
@@ -490,6 +485,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
         * the operator implements this interface.
         */
        @Deprecated
+       @SuppressWarnings("deprecation")
        public StreamStateHandle snapshotLegacy(long checkpointId, long 
timestamp) throws Exception {
 
                CheckpointStreamFactory.CheckpointStateOutputStream outStream = 
stateBackend.createStreamFactory(
@@ -513,7 +509,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        /**
         * Calls {@link 
StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
         * the operator implements this interface.
-        */     @Deprecated
+        */
+       @Deprecated
+       @SuppressWarnings("deprecation")
        public void restore(StreamStateHandle snapshot) throws Exception {
                if(operator instanceof StreamCheckpointedOperator) {
                        try (FSDataInputStream in = snapshot.openInputStream()) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 0e62fbb..ec6a8f5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@@ -81,9 +82,7 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                }
                catch (JobExecutionException e) {
                        Throwable t = e.getCause();
-                       if (!(t != null && t.getCause() instanceof 
SuccessException)) {
-                               throw e;
-                       }
+                       assertTrue("wrong exception", t instanceof 
SuccessException);
                }
        }
 

Reply via email to