http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java deleted file mode 100644 index da6e035..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java +++ /dev/null @@ -1,562 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.checkpointing.utils; - -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; - -import org.junit.Ignore; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * This verifies that we can restore a complete job from a Flink 1.1 savepoint. - * - * <p>The test pipeline contains both "Checkpointed" state and keyed user state. - */ -public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase { - private static final int NUM_SOURCE_ELEMENTS = 4; - private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS"; - private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS"; - - /** - * This has to be manually executed to create the savepoint on Flink 1.1. - */ - @Test - @Ignore - public void testCreateSavepointOnFlink11() throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet - env.setStateBackend(new MemoryStateBackend()); - env.enableCheckpointing(500); - env.setParallelism(4); - env.setMaxParallelism(4); - - // create source - env - .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") - .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") - .keyBy(0) - .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") - .keyBy(0) - .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") - .keyBy(0) - .transform( - "custom_operator", - new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), - new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") - .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR)); - - executeAndSavepoint( - env, - "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint", - new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); - } - - /** - * This has to be manually executed to create the savepoint on Flink 1.1. - */ - @Test - @Ignore - public void testCreateSavepointOnFlink11WithRocksDB() throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - RocksDBStateBackend rocksBackend = - new RocksDBStateBackend(new MemoryStateBackend()); -// rocksBackend.enableFullyAsyncSnapshots(); - env.setStateBackend(rocksBackend); - env.enableCheckpointing(500); - env.setParallelism(4); - env.setMaxParallelism(4); - - // create source - env - .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") - .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") - .keyBy(0) - .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") - .keyBy(0) - .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") - .keyBy(0) - .transform( - "custom_operator", - new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), - new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") - .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR)); - - executeAndSavepoint( - env, - "src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint", - new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); - } - - @Test - public void testSavepointRestoreFromFlink11() throws Exception { - - final int expectedSuccessfulChecks = 21; - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet - env.setStateBackend(new MemoryStateBackend()); - env.enableCheckpointing(500); - env.setParallelism(4); - env.setMaxParallelism(4); - - // create source - env - .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") - .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") - .keyBy(0) - .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") - .keyBy(0) - .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") - .keyBy(0) - .transform( - "custom_operator", - new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), - new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") - .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR)); - - restoreAndExecute( - env, - getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"), - new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, expectedSuccessfulChecks)); - } - - @Test - public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception { - - final int expectedSuccessfulChecks = 21; - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); - env.enableCheckpointing(500); - env.setParallelism(4); - env.setMaxParallelism(4); - - // create source - env - .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") - .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") - .keyBy(0) - .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") - .keyBy(0) - .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") - .keyBy(0) - .transform( - "custom_operator", - new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), - new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") - .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR)); - - restoreAndExecute( - env, - getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"), - new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, expectedSuccessfulChecks)); - } - - private static class LegacyCheckpointedSource - implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> { - - public static String checkpointedString = "Here be dragons!"; - - private static final long serialVersionUID = 1L; - - private volatile boolean isRunning = true; - - private final int numElements; - - public LegacyCheckpointedSource(int numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { - - synchronized (ctx.getCheckpointLock()) { - for (long i = 0; i < numElements; i++) { - ctx.collect(new Tuple2<>(i, i)); - } - } - while (isRunning) { - Thread.sleep(20); - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void restoreState(String state) throws Exception { - assertEquals(checkpointedString, state); - } - - @Override - public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return checkpointedString; - } - } - - private static class RestoringCheckingSource - extends RichSourceFunction<Tuple2<Long, Long>> - implements CheckpointedRestoring<String> { - - private static final long serialVersionUID = 1L; - - private volatile boolean isRunning = true; - - private final int numElements; - - private String restoredState; - - public RestoringCheckingSource(int numElements) { - this.numElements = numElements; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); - } - - @Override - public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { - assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState); - getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); - - synchronized (ctx.getCheckpointLock()) { - for (long i = 0; i < numElements; i++) { - ctx.collect(new Tuple2<>(i, i)); - } - } - - while (isRunning) { - Thread.sleep(20); - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void restoreState(String state) throws Exception { - restoredState = state; - } - } - - private static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements Checkpointed<Tuple2<String, Long>> { - - private static final long serialVersionUID = 1L; - - public static Tuple2<String, Long> checkpointedTuple = - new Tuple2<>("hello", 42L); - - @Override - public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(value); - } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - } - - @Override - public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return checkpointedTuple; - } - } - - private static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements CheckpointedRestoring<Tuple2<String, Long>> { - - private static final long serialVersionUID = 1L; - - private transient Tuple2<String, Long> restoredState; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); - } - - @Override - public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(value); - - assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState); - getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); - - } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - restoredState = state; - } - } - - private static class LegacyCheckpointedFlatMapWithKeyedState - extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements Checkpointed<Tuple2<String, Long>> { - - private static final long serialVersionUID = 1L; - - public static Tuple2<String, Long> checkpointedTuple = - new Tuple2<>("hello", 42L); - - private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); - - @Override - public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(value); - - getRuntimeContext().getState(stateDescriptor).update(value.f1); - } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - } - - @Override - public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return checkpointedTuple; - } - } - - private static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements CheckpointedRestoring<Tuple2<String, Long>> { - - private static final long serialVersionUID = 1L; - - private transient Tuple2<String, Long> restoredState; - - private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); - } - - @Override - public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(value); - - ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); - if (state == null) { - throw new RuntimeException("Missing key value state for " + value); - } - - assertEquals(value.f1, state.value()); - assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState); - getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); - } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - restoredState = state; - } - } - - private static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); - - @Override - public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(value); - - getRuntimeContext().getState(stateDescriptor).update(value.f1); - } - } - - private static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); - } - - @Override - public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(value); - - ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); - if (state == null) { - throw new RuntimeException("Missing key value state for " + value); - } - - assertEquals(value.f1, state.value()); - getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); - } - } - - private static class CheckpointedUdfOperator - extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> - implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> { - private static final long serialVersionUID = 1L; - - private static final String CHECKPOINTED_STRING = "Oh my, that's nice!"; - - public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) { - super(userFunction); - } - - @Override - public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { - output.collect(element); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - output.emitWatermark(mark); - } - - // Flink 1.1 -// @Override -// public StreamTaskState snapshotOperatorState( -// long checkpointId, long timestamp) throws Exception { -// StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp); -// -// AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView( -// checkpointId, -// timestamp); -// -// out.writeUTF(checkpointedString); -// -// result.setOperatorState(out.closeAndGetHandle()); -// -// return result; -// } - } - - private static class RestoringCheckingUdfOperator - extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> - implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> { - private static final long serialVersionUID = 1L; - - private String restoredState; - - public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) { - super(userFunction); - } - - @Override - public void open() throws Exception { - super.open(); - } - - @Override - public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { - userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); - - assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState); - getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - output.emitWatermark(mark); - } - - @Override - public void restoreState(FSDataInputStream in) throws Exception { - super.restoreState(in); - - DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in); - - restoredState = streamWrapper.readUTF(); - } - } - - private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> { - private static final long serialVersionUID = 1L; - - private final String accumulatorName; - - int count = 0; - - public AccumulatorCountingSink(String accumulatorName) { - this.accumulatorName = accumulatorName; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - getRuntimeContext().addAccumulator(accumulatorName, new IntCounter()); - } - - @Override - public void invoke(T value) throws Exception { - count++; - getRuntimeContext().getAccumulator(accumulatorName).add(1); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java index 7dd1144..6859c2d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java @@ -29,14 +29,8 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -60,10 +54,13 @@ import static org.junit.Assert.assertEquals; /** * This verifies that we can restore a complete job from a Flink 1.2 savepoint. * - * <p>The test pipeline contains both "Checkpointed" state and keyed user state. + * <p>The test for checkpointed (legacy state) was removed from this test for Flink 1.4 because compatibility with + * Flink 1.1 is removed. The legacy state in the binary savepoints is ignored by the tests now. * * <p>The tests will time out if they don't see the required number of successful checks within * a time limit. + * + * */ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase { private static final int NUM_SOURCE_ELEMENTS = 4; @@ -247,7 +244,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio } private static class LegacyCheckpointedSource - implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> { + implements SourceFunction<Tuple2<Long, Long>> { public static String checkpointedString = "Here be dragons!"; @@ -283,21 +280,10 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio public void cancel() { isRunning = false; } - - @Override - public void restoreState(String state) throws Exception { - assertEquals(checkpointedString, state); - } - - @Override - public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return checkpointedString; - } } private static class CheckingRestoringSource - extends RichSourceFunction<Tuple2<Long, Long>> - implements CheckpointedRestoring<String> { + extends RichSourceFunction<Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -322,7 +308,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio @Override public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { - assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState); getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); // immediately trigger any set timers @@ -343,15 +328,9 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio public void cancel() { isRunning = false; } - - @Override - public void restoreState(String state) throws Exception { - restoredState = state; - } } - private static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements Checkpointed<Tuple2<String, Long>> { + private static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -362,19 +341,9 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { out.collect(value); } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - } - - @Override - public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return checkpointedTuple; - } } - private static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements CheckpointedRestoring<Tuple2<String, Long>> { + private static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -393,20 +362,14 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { out.collect(value); - assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState); getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); } - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - restoredState = state; - } } private static class LegacyCheckpointedFlatMapWithKeyedState - extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements Checkpointed<Tuple2<String, Long>> { + extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -424,19 +387,10 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value()); } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - } - - @Override - public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return checkpointedTuple; - } } - private static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements CheckpointedRestoring<Tuple2<String, Long>> { + private static class CheckingRestoringFlatMapWithKeyedState + extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -464,18 +418,12 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio } assertEquals(value.f1, state.value()); - assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState); getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - restoredState = state; - } } - private static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> - implements CheckpointedRestoring<Tuple2<String, Long>> { + private static class CheckingRestoringFlatMapWithKeyedStateInOperator + extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -503,14 +451,8 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio } assertEquals(value.f1, state.value()); - assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState); getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); } - - @Override - public void restoreState(Tuple2<String, Long> state) throws Exception { - restoredState = state; - } } private static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { @@ -578,17 +520,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio public void processWatermark(Watermark mark) throws Exception { output.emitWatermark(mark); } - - @Override - public void snapshotState( - FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - super.snapshotState(out, checkpointId, timestamp); - - DataOutputViewStreamWrapper streamWrapper = new DataOutputViewStreamWrapper(out); - - streamWrapper.writeUTF(CHECKPOINTED_STRING); - streamWrapper.flush(); - } } private static class CheckingRestoringUdfOperator @@ -615,8 +546,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio @Override public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); - - assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState); getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); } @@ -624,15 +553,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio public void processWatermark(Watermark mark) throws Exception { output.emitWatermark(mark); } - - @Override - public void restoreState(FSDataInputStream in) throws Exception { - super.restoreState(in); - - DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in); - - restoredState = streamWrapper.readUTF(); - } } private static class TimelyStatefulOperator http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java deleted file mode 100644 index 1431d96..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.classloading.jar; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; - -/** - * This test is the same as the {@link CheckpointedStreamingProgram} but using the - * old and deprecated {@link Checkpointed} interface. It stays here in order to - * guarantee that although deprecated, the old Checkpointed interface is still supported. - * This is necessary to not break user code. - * */ -public class LegacyCheckpointedStreamingProgram { - - private static final int CHECKPOINT_INTERVALL = 100; - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - env.enableCheckpointing(CHECKPOINT_INTERVALL); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000)); - env.disableOperatorChaining(); - - DataStream<String> text = env.addSource(new SimpleStringGenerator()); - text.map(new StatefulMapper()).addSink(new NoOpSink()); - env.setParallelism(1); - env.execute("Checkpointed Streaming Program"); - } - - // with Checkpointing - private static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> { - - private static final long serialVersionUID = 3700033137820808611L; - - public boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - while (running) { - Thread.sleep(1); - ctx.collect("someString"); - } - } - - @Override - public void cancel() { - running = false; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - } - - @Override - public void restoreState(Integer state) { - - } - } - - private static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener { - - private static final long serialVersionUID = 2703630582894634440L; - - private String someState; - private boolean atLeastOneSnapshotComplete = false; - private boolean restored = false; - - @Override - public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return this; - } - - @Override - public void restoreState(StatefulMapper state) { - restored = true; - this.someState = state.someState; - this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; - } - - @Override - public String map(String value) throws Exception { - if (!atLeastOneSnapshotComplete) { - // throttle consumption by the checkpoint interval until we have one snapshot. - Thread.sleep(CHECKPOINT_INTERVALL); - } - if (atLeastOneSnapshotComplete && !restored) { - throw new RuntimeException("Intended failure, to trigger restore"); - } - if (restored) { - throw new SuccessException(); - //throw new RuntimeException("All good"); - } - someState = value; // update our state - return value; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - atLeastOneSnapshotComplete = true; - } - } - // -------------------------------------------------------------------------------------------- - - /** - * We intentionally use a user specified failure exception. - */ - private static class SuccessException extends Exception { - - private static final long serialVersionUID = 7073311460437532086L; - } - - private static class NoOpSink implements SinkFunction<String> { - private static final long serialVersionUID = 2381410324190818620L; - - @Override - public void invoke(String value) throws Exception { - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 3d78242..00d0b2c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -89,6 +90,11 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS); @BeforeClass + public static void beforeClass() { + SavepointSerializers.setFailWhenLegacyStateDetected(false); + } + + @BeforeClass public static void setupCluster() throws Exception { final Configuration configuration = new Configuration();