http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java new file mode 100644 index 0000000..60253fa --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.classloading.jar; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +/** + * This test is the same as the {@link CheckpointedStreamingProgram} but using the + * old and deprecated {@link Checkpointed} interface. It stays here in order to + * guarantee that although deprecated, the old Checkpointed interface is still supported. + * This is necessary to not break user code. + * */ +public class LegacyCheckpointedStreamingProgram { + + private static final int CHECKPOINT_INTERVALL = 100; + + public static void main(String[] args) throws Exception { + final String jarFile = args[0]; + final String host = args[1]; + final int port = Integer.parseInt(args[2]); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(CHECKPOINT_INTERVALL); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000)); + env.disableOperatorChaining(); + + DataStream<String> text = env.addSource(new SimpleStringGenerator()); + text.map(new StatefulMapper()).addSink(new NoOpSink()); + env.setParallelism(1); + env.execute("Checkpointed Streaming Program"); + } + + + // with Checkpointing + public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> { + + private static final long serialVersionUID = 3700033137820808611L; + + public boolean running = true; + + @Override + public void run(SourceContext<String> ctx) throws Exception { + while(running) { + Thread.sleep(1); + ctx.collect("someString"); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return null; + } + + @Override + public void restoreState(Integer state) { + + } + } + + public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener { + + private static final long serialVersionUID = 2703630582894634440L; + + private String someState; + private boolean atLeastOneSnapshotComplete = false; + private boolean restored = false; + + @Override + public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return this; + } + + @Override + public void restoreState(StatefulMapper state) { + restored = true; + this.someState = state.someState; + this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; + } + + @Override + public String map(String value) throws Exception { + if(!atLeastOneSnapshotComplete) { + // throttle consumption by the checkpoint interval until we have one snapshot. + Thread.sleep(CHECKPOINT_INTERVALL); + } + if(atLeastOneSnapshotComplete && !restored) { + throw new RuntimeException("Intended failure, to trigger restore"); + } + if(restored) { + throw new SuccessException(); + //throw new RuntimeException("All good"); + } + someState = value; // update our state + return value; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + atLeastOneSnapshotComplete = true; + } + } + // -------------------------------------------------------------------------------------------- + + /** + * We intentionally use a user specified failure exception + */ + public static class SuccessException extends Exception { + + private static final long serialVersionUID = 7073311460437532086L; + } + + public static class NoOpSink implements SinkFunction<String> { + private static final long serialVersionUID = 2381410324190818620L; + + @Override + public void invoke(String value) throws Exception { + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index 4d10bf1..bba218f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -63,6 +63,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.UUID; @@ -383,7 +384,7 @@ public class ChaosMonkeyITCase extends TestLogger { } public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> - implements Checkpointed<Long>, CheckpointListener { + implements ListCheckpointed<Long>, CheckpointListener { private static final long serialVersionUID = 0L; @@ -426,18 +427,20 @@ public class ChaosMonkeyITCase extends TestLogger { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { LOG.info("Snapshotting state {} @ ID {}.", current, checkpointId); - return current; + return Collections.singletonList(this.current); } @Override - public void restoreState(Long state) { - LOG.info("Restoring state {}/{}", state, end); - current = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + LOG.info("Restoring state {}/{}", state.get(0), end); + this.current = state.get(0); } - @Override public void cancel() { isRunning = false; @@ -453,7 +456,7 @@ public class ChaosMonkeyITCase extends TestLogger { } public static class CountingSink extends RichSinkFunction<Long> - implements Checkpointed<CountingSink>, CheckpointListener { + implements ListCheckpointed<CountingSink>, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class); @@ -467,7 +470,6 @@ public class ChaosMonkeyITCase extends TestLogger { private int numberOfReceivedLastElements; - public CountingSink(int parallelism, long expectedFinalCount) { this.expectedFinalCount = expectedFinalCount; this.parallelism = parallelism; @@ -496,16 +498,20 @@ public class ChaosMonkeyITCase extends TestLogger { } @Override - public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<CountingSink> snapshotState(long checkpointId, long timestamp) throws Exception { LOG.info("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId); - return this; + return Collections.singletonList(this); } @Override - public void restoreState(CountingSink state) { - LOG.info("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements); - this.current = state.current; - this.numberOfReceivedLastElements = state.numberOfReceivedLastElements; + public void restoreState(List<CountingSink> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + CountingSink sink = state.get(0); + this.current = sink.current; + this.numberOfReceivedLastElements = sink.numberOfReceivedLastElements; + LOG.info("Restoring state {}:{}", sink.current, sink.numberOfReceivedLastElements); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 418aa51..60a3a62 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -63,6 +63,8 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -460,7 +462,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { * A checkpointed source, which emits elements from 0 to a configured number. */ public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class); @@ -500,22 +502,26 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId); - return current; + return Collections.singletonList(this.current); } @Override - public void restoreState(Long state) { - LOG.debug("Restoring state {}", state); + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + Long s = state.get(0); + LOG.debug("Restoring state {}", s); // This is necessary to make sure that something is recovered at all. Otherwise it // might happen that the job is restarted from the beginning. - RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), state); + RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s); sync.countDown(); - current = state; + current = s; } @Override @@ -528,7 +534,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { * A checkpointed sink, which sums up its input and notifies the main thread after all inputs * are exhausted. */ - public static class CountingSink implements SinkFunction<Long>, Checkpointed<CountingSink>, + public static class CountingSink implements SinkFunction<Long>, ListCheckpointed<CountingSink>, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class); @@ -558,16 +564,21 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { } @Override - public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public List<CountingSink> snapshotState(long checkpointId, long timestamp) throws Exception { LOG.debug("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId); - return this; + return Collections.singletonList(this); } @Override - public void restoreState(CountingSink state) { - LOG.debug("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements); - this.current = state.current; - this.numberOfReceivedLastElements = state.numberOfReceivedLastElements; + public void restoreState(List<CountingSink> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + CountingSink s = state.get(0); + LOG.debug("Restoring state {}:{}", s.current, s.numberOfReceivedLastElements); + + this.current = s.current; + this.numberOfReceivedLastElements = s.numberOfReceivedLastElements; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java index fcc3d42..987a586 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -20,6 +20,8 @@ package org.apache.flink.test.recovery; import java.io.File; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.UUID; import org.apache.commons.io.FileUtils; @@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -106,7 +108,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa } public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { private static final long SLEEP_TIME = 50; @@ -160,13 +162,16 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return collected; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.collected); } @Override - public void restoreState(Long state) { - collected = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.collected = state.get(0); } } @@ -189,7 +194,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa } } - private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> { + private static class CheckpointedSink extends RichSinkFunction<Long> implements ListCheckpointed<Long> { private long stepSize; private long congruence; @@ -223,13 +228,16 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa } @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return collected; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.collected); } @Override - public void restoreState(Long state) { - collected = state; + public void restoreState(List<Long> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.collected = state.get(0); } } }