http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
new file mode 100644
index 0000000..60253fa
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+/**
+ * This test is the same as the {@link CheckpointedStreamingProgram} but using 
the
+ * old and deprecated {@link Checkpointed} interface. It stays here in order to
+ * guarantee that although deprecated, the old Checkpointed interface is still 
supported.
+ * This is necessary to not break user code.
+ * */
+public class LegacyCheckpointedStreamingProgram {
+
+       private static final int CHECKPOINT_INTERVALL = 100;
+
+       public static void main(String[] args) throws Exception {
+               final String jarFile = args[0];
+               final String host = args[1];
+               final int port = Integer.parseInt(args[2]);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+               env.getConfig().disableSysoutLogging();
+               env.enableCheckpointing(CHECKPOINT_INTERVALL);
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10000));
+               env.disableOperatorChaining();
+
+               DataStream<String> text = env.addSource(new 
SimpleStringGenerator());
+               text.map(new StatefulMapper()).addSink(new NoOpSink());
+               env.setParallelism(1);
+               env.execute("Checkpointed Streaming Program");
+       }
+
+
+       // with Checkpointing
+       public static class SimpleStringGenerator implements 
SourceFunction<String>, Checkpointed<Integer> {
+
+               private static final long serialVersionUID = 
3700033137820808611L;
+
+               public boolean running = true;
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       while(running) {
+                               Thread.sleep(1);
+                               ctx.collect("someString");
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               @Override
+               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return null;
+               }
+
+               @Override
+               public void restoreState(Integer state) {
+
+               }
+       }
+
+       public static class StatefulMapper implements MapFunction<String, 
String>, Checkpointed<StatefulMapper>, CheckpointListener {
+
+               private static final long serialVersionUID = 
2703630582894634440L;
+
+               private String someState;
+               private boolean atLeastOneSnapshotComplete = false;
+               private boolean restored = false;
+
+               @Override
+               public StatefulMapper snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return this;
+               }
+
+               @Override
+               public void restoreState(StatefulMapper state) {
+                       restored = true;
+                       this.someState = state.someState;
+                       this.atLeastOneSnapshotComplete = 
state.atLeastOneSnapshotComplete;
+               }
+
+               @Override
+               public String map(String value) throws Exception {
+                       if(!atLeastOneSnapshotComplete) {
+                               // throttle consumption by the checkpoint 
interval until we have one snapshot.
+                               Thread.sleep(CHECKPOINT_INTERVALL);
+                       }
+                       if(atLeastOneSnapshotComplete && !restored) {
+                               throw new RuntimeException("Intended failure, 
to trigger restore");
+                       }
+                       if(restored) {
+                               throw new SuccessException();
+                               //throw new RuntimeException("All good");
+                       }
+                       someState = value; // update our state
+                       return value;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       atLeastOneSnapshotComplete = true;
+               }
+       }
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * We intentionally use a user specified failure exception
+        */
+       public static class SuccessException extends Exception {
+
+               private static final long serialVersionUID = 
7073311460437532086L;
+       }
+
+       public static class NoOpSink implements SinkFunction<String> {
+               private static final long serialVersionUID = 
2381410324190818620L;
+
+               @Override
+               public void invoke(String value) throws Exception {
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 4d10bf1..bba218f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -63,6 +63,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -383,7 +384,7 @@ public class ChaosMonkeyITCase extends TestLogger {
        }
 
        public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
-                       implements Checkpointed<Long>, CheckpointListener {
+                       implements ListCheckpointed<Long>, CheckpointListener {
 
                private static final long serialVersionUID = 0L;
 
@@ -426,18 +427,20 @@ public class ChaosMonkeyITCase extends TestLogger {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        LOG.info("Snapshotting state {} @ ID {}.", current, 
checkpointId);
-                       return current;
+                       return Collections.singletonList(this.current);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       LOG.info("Restoring state {}/{}", state, end);
-                       current = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       LOG.info("Restoring state {}/{}", state.get(0), end);
+                       this.current = state.get(0);
                }
 
-
                @Override
                public void cancel() {
                        isRunning = false;
@@ -453,7 +456,7 @@ public class ChaosMonkeyITCase extends TestLogger {
        }
 
        public static class CountingSink extends RichSinkFunction<Long>
-                       implements Checkpointed<CountingSink>, 
CheckpointListener {
+                       implements ListCheckpointed<CountingSink>, 
CheckpointListener {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CountingSink.class);
 
@@ -467,7 +470,6 @@ public class ChaosMonkeyITCase extends TestLogger {
 
                private int numberOfReceivedLastElements;
 
-
                public CountingSink(int parallelism, long expectedFinalCount) {
                        this.expectedFinalCount = expectedFinalCount;
                        this.parallelism = parallelism;
@@ -496,16 +498,20 @@ public class ChaosMonkeyITCase extends TestLogger {
                }
 
                @Override
-               public CountingSink snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               public List<CountingSink> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        LOG.info("Snapshotting state {}:{} @ ID {}.", current, 
numberOfReceivedLastElements, checkpointId);
-                       return this;
+                       return Collections.singletonList(this);
                }
 
                @Override
-               public void restoreState(CountingSink state) {
-                       LOG.info("Restoring state {}:{}", state.current, 
state.numberOfReceivedLastElements);
-                       this.current = state.current;
-                       this.numberOfReceivedLastElements = 
state.numberOfReceivedLastElements;
+               public void restoreState(List<CountingSink> state) throws 
Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       CountingSink sink = state.get(0);
+                       this.current = sink.current;
+                       this.numberOfReceivedLastElements = 
sink.numberOfReceivedLastElements;
+                       LOG.info("Restoring state {}:{}", sink.current, 
sink.numberOfReceivedLastElements);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 418aa51..60a3a62 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -63,6 +63,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -460,7 +462,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
         * A checkpointed source, which emits elements from 0 to a configured 
number.
         */
        public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -500,22 +502,26 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        LOG.debug("Snapshotting state {} @ ID {}.", current, 
checkpointId);
-                       return current;
+                       return Collections.singletonList(this.current);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       LOG.debug("Restoring state {}", state);
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       Long s = state.get(0);
+                       LOG.debug("Restoring state {}", s);
 
                        // This is necessary to make sure that something is 
recovered at all. Otherwise it
                        // might happen that the job is restarted from the 
beginning.
-                       
RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), state);
+                       
RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s);
 
                        sync.countDown();
 
-                       current = state;
+                       current = s;
                }
 
                @Override
@@ -528,7 +534,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
         * A checkpointed sink, which sums up its input and notifies the main 
thread after all inputs
         * are exhausted.
         */
-       public static class CountingSink implements SinkFunction<Long>, 
Checkpointed<CountingSink>,
+       public static class CountingSink implements SinkFunction<Long>, 
ListCheckpointed<CountingSink>,
                CheckpointListener {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CountingSink.class);
@@ -558,16 +564,21 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                }
 
                @Override
-               public CountingSink snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               public List<CountingSink> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        LOG.debug("Snapshotting state {}:{} @ ID {}.", current, 
numberOfReceivedLastElements, checkpointId);
-                       return this;
+                       return Collections.singletonList(this);
                }
 
                @Override
-               public void restoreState(CountingSink state) {
-                       LOG.debug("Restoring state {}:{}", state.current, 
state.numberOfReceivedLastElements);
-                       this.current = state.current;
-                       this.numberOfReceivedLastElements = 
state.numberOfReceivedLastElements;
+               public void restoreState(List<CountingSink> state) throws 
Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       CountingSink s = state.get(0);
+                       LOG.debug("Restoring state {}:{}", s.current, 
s.numberOfReceivedLastElements);
+
+                       this.current = s.current;
+                       this.numberOfReceivedLastElements = 
s.numberOfReceivedLastElements;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index fcc3d42..987a586 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -20,6 +20,8 @@ package org.apache.flink.test.recovery;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -106,7 +108,7 @@ public class 
TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
        }
 
        public static class SleepyDurableGenerateSequence extends 
RichParallelSourceFunction<Long> 
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
 
                private static final long SLEEP_TIME = 50;
 
@@ -160,13 +162,16 @@ public class 
TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return collected;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.collected);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       collected = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.collected = state.get(0);
                }
        }
 
@@ -189,7 +194,7 @@ public class 
TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
                }
        }
 
-       private static class CheckpointedSink extends RichSinkFunction<Long> 
implements Checkpointed<Long> {
+       private static class CheckpointedSink extends RichSinkFunction<Long> 
implements ListCheckpointed<Long> {
 
                private long stepSize;
                private long congruence;
@@ -223,13 +228,16 @@ public class 
TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
                }
 
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return collected;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(this.collected);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       collected = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if (state.isEmpty() || state.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.collected = state.get(0);
                }
        }
 }

Reply via email to