http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
deleted file mode 100644
index da6e035..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
- *
- * <p>The test pipeline contains both "Checkpointed" state and keyed user 
state.
- */
-public class StatefulJobSavepointFrom11MigrationITCase extends 
SavepointMigrationTestBase {
-       private static final int NUM_SOURCE_ELEMENTS = 4;
-       private static final String EXPECTED_ELEMENTS_ACCUMULATOR = 
"NUM_EXPECTED_ELEMENTS";
-       private static final String SUCCESSFUL_CHECK_ACCUMULATOR = 
"SUCCESSFUL_CHECKS";
-
-       /**
-        * This has to be manually executed to create the savepoint on Flink 
1.1.
-        */
-       @Test
-       @Ignore
-       public void testCreateSavepointOnFlink11() throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // we only test memory state backend yet
-               env.setStateBackend(new MemoryStateBackend());
-               env.enableCheckpointing(500);
-               env.setParallelism(4);
-               env.setMaxParallelism(4);
-
-               // create source
-               env
-                               .addSource(new 
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-                               .flatMap(new 
LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-                               .keyBy(0)
-                               .flatMap(new 
LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-                               .keyBy(0)
-                               .flatMap(new 
KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-                               .keyBy(0)
-                               .transform(
-                                               "custom_operator",
-                                               new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
-                                               new CheckpointedUdfOperator(new 
LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-                               .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-               executeAndSavepoint(
-                               env,
-                               
"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
-                               new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
-       }
-
-       /**
-        * This has to be manually executed to create the savepoint on Flink 
1.1.
-        */
-       @Test
-       @Ignore
-       public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               RocksDBStateBackend rocksBackend =
-                               new RocksDBStateBackend(new 
MemoryStateBackend());
-//             rocksBackend.enableFullyAsyncSnapshots();
-               env.setStateBackend(rocksBackend);
-               env.enableCheckpointing(500);
-               env.setParallelism(4);
-               env.setMaxParallelism(4);
-
-               // create source
-               env
-                               .addSource(new 
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-                               .flatMap(new 
LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-                               .keyBy(0)
-                               .flatMap(new 
LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-                               .keyBy(0)
-                               .flatMap(new 
KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-                               .keyBy(0)
-                               .transform(
-                                               "custom_operator",
-                                               new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
-                                               new CheckpointedUdfOperator(new 
LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-                               .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-               executeAndSavepoint(
-                               env,
-                               
"src/test/resources/stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint",
-                               new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
-       }
-
-       @Test
-       public void testSavepointRestoreFromFlink11() throws Exception {
-
-               final int expectedSuccessfulChecks = 21;
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // we only test memory state backend yet
-               env.setStateBackend(new MemoryStateBackend());
-               env.enableCheckpointing(500);
-               env.setParallelism(4);
-               env.setMaxParallelism(4);
-
-               // create source
-               env
-                               .addSource(new 
RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-                               .flatMap(new 
RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-                               .keyBy(0)
-                               .flatMap(new 
RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-                               .keyBy(0)
-                               .flatMap(new 
KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-                               .keyBy(0)
-                               .transform(
-                                               "custom_operator",
-                                               new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
-                                               new 
RestoringCheckingUdfOperator(new 
RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-                               .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-               restoreAndExecute(
-                               env,
-                               
getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
-                               new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, 
expectedSuccessfulChecks));
-       }
-
-       @Test
-       public void testSavepointRestoreFromFlink11FromRocksDB() throws 
Exception {
-
-               final int expectedSuccessfulChecks = 21;
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // we only test memory state backend yet
-               env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
-               env.enableCheckpointing(500);
-               env.setParallelism(4);
-               env.setMaxParallelism(4);
-
-               // create source
-               env
-                               .addSource(new 
RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
-                               .flatMap(new 
RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
-                               .keyBy(0)
-                               .flatMap(new 
RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
-                               .keyBy(0)
-                               .flatMap(new 
KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
-                               .keyBy(0)
-                               .transform(
-                                               "custom_operator",
-                                               new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
-                                               new 
RestoringCheckingUdfOperator(new 
RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
-                               .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
-               restoreAndExecute(
-                               env,
-                               
getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"),
-                               new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, 
expectedSuccessfulChecks));
-       }
-
-       private static class LegacyCheckpointedSource
-                       implements SourceFunction<Tuple2<Long, Long>>, 
Checkpointed<String> {
-
-               public static String checkpointedString = "Here be dragons!";
-
-               private static final long serialVersionUID = 1L;
-
-               private volatile boolean isRunning = true;
-
-               private final int numElements;
-
-               public LegacyCheckpointedSource(int numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
-
-                       synchronized (ctx.getCheckpointLock()) {
-                               for (long i = 0; i < numElements; i++) {
-                                       ctx.collect(new Tuple2<>(i, i));
-                               }
-                       }
-                       while (isRunning) {
-                               Thread.sleep(20);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               @Override
-               public void restoreState(String state) throws Exception {
-                       assertEquals(checkpointedString, state);
-               }
-
-               @Override
-               public String snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return checkpointedString;
-               }
-       }
-
-       private static class RestoringCheckingSource
-                       extends RichSourceFunction<Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<String> {
-
-               private static final long serialVersionUID = 1L;
-
-               private volatile boolean isRunning = true;
-
-               private final int numElements;
-
-               private String restoredState;
-
-               public RestoringCheckingSource(int numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-
-                       
getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new 
IntCounter());
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
-                       
assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState);
-                       
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-
-                       synchronized (ctx.getCheckpointLock()) {
-                               for (long i = 0; i < numElements; i++) {
-                                       ctx.collect(new Tuple2<>(i, i));
-                               }
-                       }
-
-                       while (isRunning) {
-                               Thread.sleep(20);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               @Override
-               public void restoreState(String state) throws Exception {
-                       restoredState = state;
-               }
-       }
-
-       private static class LegacyCheckpointedFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements Checkpointed<Tuple2<String, Long>> {
-
-               private static final long serialVersionUID = 1L;
-
-               public static Tuple2<String, Long> checkpointedTuple =
-                               new Tuple2<>("hello", 42L);
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
-                       out.collect(value);
-               }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-               }
-
-               @Override
-               public Tuple2<String, Long> snapshotState(long checkpointId, 
long checkpointTimestamp) throws Exception {
-                       return checkpointedTuple;
-               }
-       }
-
-       private static class RestoringCheckingFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<Tuple2<String, Long>> {
-
-               private static final long serialVersionUID = 1L;
-
-               private transient Tuple2<String, Long> restoredState;
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-
-                       
getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new 
IntCounter());
-               }
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
-                       out.collect(value);
-
-                       
assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
-                       
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-
-               }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-                       restoredState = state;
-               }
-       }
-
-       private static class LegacyCheckpointedFlatMapWithKeyedState
-                       extends RichFlatMapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>>
-                       implements Checkpointed<Tuple2<String, Long>> {
-
-               private static final long serialVersionUID = 1L;
-
-               public static Tuple2<String, Long> checkpointedTuple =
-                               new Tuple2<>("hello", 42L);
-
-               private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
-                       out.collect(value);
-
-                       
getRuntimeContext().getState(stateDescriptor).update(value.f1);
-               }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-               }
-
-               @Override
-               public Tuple2<String, Long> snapshotState(long checkpointId, 
long checkpointTimestamp) throws Exception {
-                       return checkpointedTuple;
-               }
-       }
-
-       private static class RestoringCheckingFlatMapWithKeyedState extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<Tuple2<String, Long>> {
-
-               private static final long serialVersionUID = 1L;
-
-               private transient Tuple2<String, Long> restoredState;
-
-               private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-
-                       
getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new 
IntCounter());
-               }
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
-                       out.collect(value);
-
-                       ValueState<Long> state = 
getRuntimeContext().getState(stateDescriptor);
-                       if (state == null) {
-                               throw new RuntimeException("Missing key value 
state for " + value);
-                       }
-
-                       assertEquals(value.f1, state.value());
-                       
assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
-                       
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-               }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-                       restoredState = state;
-               }
-       }
-
-       private static class KeyedStateSettingFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
-                       out.collect(value);
-
-                       
getRuntimeContext().getState(stateDescriptor).update(value.f1);
-               }
-       }
-
-       private static class KeyedStateCheckingFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-
-                       
getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new 
IntCounter());
-               }
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
-                       out.collect(value);
-
-                       ValueState<Long> state = 
getRuntimeContext().getState(stateDescriptor);
-                       if (state == null) {
-                               throw new RuntimeException("Missing key value 
state for " + value);
-                       }
-
-                       assertEquals(value.f1, state.value());
-                       
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-               }
-       }
-
-       private static class CheckpointedUdfOperator
-                       extends AbstractUdfStreamOperator<Tuple2<Long, Long>, 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
-                       implements OneInputStreamOperator<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
-               private static final long serialVersionUID = 1L;
-
-               private static final String CHECKPOINTED_STRING = "Oh my, 
that's nice!";
-
-               public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> userFunction) {
-                       super(userFunction);
-               }
-
-               @Override
-               public void processElement(StreamRecord<Tuple2<Long, Long>> 
element) throws Exception {
-                       output.collect(element);
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {
-                       output.emitWatermark(mark);
-               }
-
-               // Flink 1.1
-//             @Override
-//             public StreamTaskState snapshotOperatorState(
-//                             long checkpointId, long timestamp) throws 
Exception {
-//                     StreamTaskState result = 
super.snapshotOperatorState(checkpointId, timestamp);
-//
-//                     AbstractStateBackend.CheckpointStateOutputView out = 
getStateBackend().createCheckpointStateOutputView(
-//                                     checkpointId,
-//                                     timestamp);
-//
-//                     out.writeUTF(checkpointedString);
-//
-//                     result.setOperatorState(out.closeAndGetHandle());
-//
-//                     return result;
-//             }
-       }
-
-       private static class RestoringCheckingUdfOperator
-                       extends AbstractUdfStreamOperator<Tuple2<Long, Long>, 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
-                       implements OneInputStreamOperator<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
-               private static final long serialVersionUID = 1L;
-
-               private String restoredState;
-
-               public 
RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, 
Long>> userFunction) {
-                       super(userFunction);
-               }
-
-               @Override
-               public void open() throws Exception {
-                       super.open();
-               }
-
-               @Override
-               public void processElement(StreamRecord<Tuple2<Long, Long>> 
element) throws Exception {
-                       userFunction.flatMap(element.getValue(), new 
TimestampedCollector<>(output));
-
-                       
assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
-                       
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {
-                       output.emitWatermark(mark);
-               }
-
-               @Override
-               public void restoreState(FSDataInputStream in) throws Exception 
{
-                       super.restoreState(in);
-
-                       DataInputViewStreamWrapper streamWrapper = new 
DataInputViewStreamWrapper(in);
-
-                       restoredState = streamWrapper.readUTF();
-               }
-       }
-
-       private static class AccumulatorCountingSink<T> extends 
RichSinkFunction<T> {
-               private static final long serialVersionUID = 1L;
-
-               private final String accumulatorName;
-
-               int count = 0;
-
-               public AccumulatorCountingSink(String accumulatorName) {
-                       this.accumulatorName = accumulatorName;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-
-                       getRuntimeContext().addAccumulator(accumulatorName, new 
IntCounter());
-               }
-
-               @Override
-               public void invoke(T value) throws Exception {
-                       count++;
-                       
getRuntimeContext().getAccumulator(accumulatorName).add(1);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index 7dd1144..6859c2d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -29,14 +29,8 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -60,10 +54,13 @@ import static org.junit.Assert.assertEquals;
 /**
  * This verifies that we can restore a complete job from a Flink 1.2 savepoint.
  *
- * <p>The test pipeline contains both "Checkpointed" state and keyed user 
state.
+ * <p>The test for checkpointed (legacy state) was removed from this test for 
Flink 1.4 because compatibility with
+ * Flink 1.1 is removed. The legacy state in the binary savepoints is ignored 
by the tests now.
  *
  * <p>The tests will time out if they don't see the required number of 
successful checks within
  * a time limit.
+ *
+ *
  */
 public class StatefulJobSavepointFrom12MigrationITCase extends 
SavepointMigrationTestBase {
        private static final int NUM_SOURCE_ELEMENTS = 4;
@@ -247,7 +244,7 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
        }
 
        private static class LegacyCheckpointedSource
-                       implements SourceFunction<Tuple2<Long, Long>>, 
Checkpointed<String> {
+                       implements SourceFunction<Tuple2<Long, Long>> {
 
                public static String checkpointedString = "Here be dragons!";
 
@@ -283,21 +280,10 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                public void cancel() {
                        isRunning = false;
                }
-
-               @Override
-               public void restoreState(String state) throws Exception {
-                       assertEquals(checkpointedString, state);
-               }
-
-               @Override
-               public String snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return checkpointedString;
-               }
        }
 
        private static class CheckingRestoringSource
-                       extends RichSourceFunction<Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<String> {
+                       extends RichSourceFunction<Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -322,7 +308,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                @Override
                public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
-                       
assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState);
                        
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
 
                        // immediately trigger any set timers
@@ -343,15 +328,9 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                public void cancel() {
                        isRunning = false;
                }
-
-               @Override
-               public void restoreState(String state) throws Exception {
-                       restoredState = state;
-               }
        }
 
-       private static class LegacyCheckpointedFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements Checkpointed<Tuple2<String, Long>> {
+       private static class LegacyCheckpointedFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -362,19 +341,9 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
                        out.collect(value);
                }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-               }
-
-               @Override
-               public Tuple2<String, Long> snapshotState(long checkpointId, 
long checkpointTimestamp) throws Exception {
-                       return checkpointedTuple;
-               }
        }
 
-       private static class CheckingRestoringFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<Tuple2<String, Long>> {
+       private static class CheckingRestoringFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -393,20 +362,14 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
                        out.collect(value);
 
-                       
assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
                        
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
 
                }
 
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-                       restoredState = state;
-               }
        }
 
        private static class LegacyCheckpointedFlatMapWithKeyedState
-                       extends RichFlatMapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>>
-                       implements Checkpointed<Tuple2<String, Long>> {
+                       extends RichFlatMapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -424,19 +387,10 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                        assertEquals(value.f1, 
getRuntimeContext().getState(stateDescriptor).value());
                }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-               }
-
-               @Override
-               public Tuple2<String, Long> snapshotState(long checkpointId, 
long checkpointTimestamp) throws Exception {
-                       return checkpointedTuple;
-               }
        }
 
-       private static class CheckingRestoringFlatMapWithKeyedState extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<Tuple2<String, Long>> {
+       private static class CheckingRestoringFlatMapWithKeyedState
+               extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, 
Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -464,18 +418,12 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                        }
 
                        assertEquals(value.f1, state.value());
-                       
assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
                        
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
                }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-                       restoredState = state;
-               }
        }
 
-       private static class CheckingRestoringFlatMapWithKeyedStateInOperator 
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-                       implements CheckpointedRestoring<Tuple2<String, Long>> {
+       private static class CheckingRestoringFlatMapWithKeyedStateInOperator
+               extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, 
Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -503,14 +451,8 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                        }
 
                        assertEquals(value.f1, state.value());
-                       
assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
                        
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
                }
-
-               @Override
-               public void restoreState(Tuple2<String, Long> state) throws 
Exception {
-                       restoredState = state;
-               }
        }
 
        private static class KeyedStateSettingFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@@ -578,17 +520,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                public void processWatermark(Watermark mark) throws Exception {
                        output.emitWatermark(mark);
                }
-
-               @Override
-               public void snapshotState(
-                               FSDataOutputStream out, long checkpointId, long 
timestamp) throws Exception {
-                       super.snapshotState(out, checkpointId, timestamp);
-
-                       DataOutputViewStreamWrapper streamWrapper = new 
DataOutputViewStreamWrapper(out);
-
-                       streamWrapper.writeUTF(CHECKPOINTED_STRING);
-                       streamWrapper.flush();
-               }
        }
 
        private static class CheckingRestoringUdfOperator
@@ -615,8 +546,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                @Override
                public void processElement(StreamRecord<Tuple2<Long, Long>> 
element) throws Exception {
                        userFunction.flatMap(element.getValue(), new 
TimestampedCollector<>(output));
-
-                       
assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
                        
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
                }
 
@@ -624,15 +553,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                public void processWatermark(Watermark mark) throws Exception {
                        output.emitWatermark(mark);
                }
-
-               @Override
-               public void restoreState(FSDataInputStream in) throws Exception 
{
-                       super.restoreState(in);
-
-                       DataInputViewStreamWrapper streamWrapper = new 
DataInputViewStreamWrapper(in);
-
-                       restoredState = streamWrapper.readUTF();
-               }
        }
 
        private static class TimelyStatefulOperator

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
deleted file mode 100644
index 1431d96..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.classloading.jar;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-/**
- * This test is the same as the {@link CheckpointedStreamingProgram} but using 
the
- * old and deprecated {@link Checkpointed} interface. It stays here in order to
- * guarantee that although deprecated, the old Checkpointed interface is still 
supported.
- * This is necessary to not break user code.
- * */
-public class LegacyCheckpointedStreamingProgram {
-
-       private static final int CHECKPOINT_INTERVALL = 100;
-
-       public static void main(String[] args) throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-               env.enableCheckpointing(CHECKPOINT_INTERVALL);
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10000));
-               env.disableOperatorChaining();
-
-               DataStream<String> text = env.addSource(new 
SimpleStringGenerator());
-               text.map(new StatefulMapper()).addSink(new NoOpSink());
-               env.setParallelism(1);
-               env.execute("Checkpointed Streaming Program");
-       }
-
-       // with Checkpointing
-       private static class SimpleStringGenerator implements 
SourceFunction<String>, Checkpointed<Integer> {
-
-               private static final long serialVersionUID = 
3700033137820808611L;
-
-               public boolean running = true;
-
-               @Override
-               public void run(SourceContext<String> ctx) throws Exception {
-                       while (running) {
-                               Thread.sleep(1);
-                               ctx.collect("someString");
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return null;
-               }
-
-               @Override
-               public void restoreState(Integer state) {
-
-               }
-       }
-
-       private static class StatefulMapper implements MapFunction<String, 
String>, Checkpointed<StatefulMapper>, CheckpointListener {
-
-               private static final long serialVersionUID = 
2703630582894634440L;
-
-               private String someState;
-               private boolean atLeastOneSnapshotComplete = false;
-               private boolean restored = false;
-
-               @Override
-               public StatefulMapper snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return this;
-               }
-
-               @Override
-               public void restoreState(StatefulMapper state) {
-                       restored = true;
-                       this.someState = state.someState;
-                       this.atLeastOneSnapshotComplete = 
state.atLeastOneSnapshotComplete;
-               }
-
-               @Override
-               public String map(String value) throws Exception {
-                       if (!atLeastOneSnapshotComplete) {
-                               // throttle consumption by the checkpoint 
interval until we have one snapshot.
-                               Thread.sleep(CHECKPOINT_INTERVALL);
-                       }
-                       if (atLeastOneSnapshotComplete && !restored) {
-                               throw new RuntimeException("Intended failure, 
to trigger restore");
-                       }
-                       if (restored) {
-                               throw new SuccessException();
-                               //throw new RuntimeException("All good");
-                       }
-                       someState = value; // update our state
-                       return value;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       atLeastOneSnapshotComplete = true;
-               }
-       }
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * We intentionally use a user specified failure exception.
-        */
-       private static class SuccessException extends Exception {
-
-               private static final long serialVersionUID = 
7073311460437532086L;
-       }
-
-       private static class NoOpSink implements SinkFunction<String> {
-               private static final long serialVersionUID = 
2381410324190818620L;
-
-               @Override
-               public void invoke(String value) throws Exception {
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 3d78242..00d0b2c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -89,6 +90,11 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
        private static final FiniteDuration timeout = new FiniteDuration(30L, 
TimeUnit.SECONDS);
 
        @BeforeClass
+       public static void beforeClass() {
+               SavepointSerializers.setFailWhenLegacyStateDetected(false);
+       }
+
+       @BeforeClass
        public static void setupCluster() throws Exception {
                final Configuration configuration = new Configuration();
 

Reply via email to