http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
deleted file mode 100644
index 49d772e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.runtime.state.memory;
-
-import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.migration.runtime.state.StateHandle;
-import org.apache.flink.migration.util.MigrationInstantiationUtil;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A state handle that represents its state in serialized form as bytes.
- *
- * @param <T> The type of state represented by this state handle.
- */
-@SuppressWarnings("deprecation")
-public class SerializedStateHandle<T extends Serializable> extends 
AbstractCloseableHandle implements StateHandle<T> {
-       
-       private static final long serialVersionUID = 4145685722538475769L;
-
-       /** The serialized data */
-       private final byte[] serializedData;
-       
-       /**
-        * Creates a new serialized state handle, eagerly serializing the given 
state object.
-        * 
-        * @param value The state object.
-        * @throws IOException Thrown, if the serialization fails.
-        */
-       public SerializedStateHandle(T value) throws IOException {
-               this.serializedData = value == null ? null : 
InstantiationUtil.serializeObject(value);
-       }
-
-       /**
-        * Creates a new serialized state handle, based in the given already 
serialized data.
-        * 
-        * @param serializedData The serialized data.
-        */
-       public SerializedStateHandle(byte[] serializedData) {
-               this.serializedData = serializedData;
-       }
-       
-       @Override
-       public T getState(ClassLoader classLoader) throws Exception {
-               if (classLoader == null) {
-                       throw new NullPointerException();
-               }
-
-               ensureNotClosed();
-               return serializedData == null ? null : 
MigrationInstantiationUtil.<T>deserializeObject(serializedData, classLoader);
-       }
-
-       /**
-        * Gets the size of the serialized state.
-        * @return The size of the serialized state.
-        */
-       public int getSizeOfSerializedState() {
-               return serializedData.length;
-       }
-
-       /**
-        * Discarding heap-memory backed state is a no-op, so this method does 
nothing.
-        */
-       @Override
-       public void discardState() {}
-
-       @Override
-       public long getStateSize() {
-               return serializedData.length;
-       }
-
-       public byte[] getSerializedData() {
-               return serializedData;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
deleted file mode 100644
index 3f1ff55..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.state;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Migration;
-
-/**
- * This class is just a KeyGroupsStateHandle that is tagged as migration, to 
figure out which restore logic to apply,
- * e.g. when restoring backend data from a state handle.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Internal
-@Deprecated
-public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle 
implements Migration {
-
-       private static final long serialVersionUID = -8554427169776881697L;
-
-       /**
-        * @param groupRangeOffsets range of key-group ids that in the state of 
this handle
-        * @param streamStateHandle handle to the actual state of the key-groups
-        */
-       public MigrationKeyGroupStateHandle(KeyGroupRangeOffsets 
groupRangeOffsets, StreamStateHandle streamStateHandle) {
-               super(groupRangeOffsets, streamStateHandle);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
deleted file mode 100644
index 2201916..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.state;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataInputStreamWrapper;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Migration;
-
-import java.io.IOException;
-
-/**
- * This class is just a StreamStateHandle that is tagged as migration, to 
figure out which restore logic to apply, e.g.
- * when restoring backend data from a state handle.
- *
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Internal
-@Deprecated
-public class MigrationStreamStateHandle implements StreamStateHandle, 
Migration {
-
-       private static final long serialVersionUID = -2332113722532150112L;
-       private final StreamStateHandle delegate;
-
-       public MigrationStreamStateHandle(StreamStateHandle delegate) {
-               this.delegate = delegate;
-       }
-
-       @Override
-       public FSDataInputStream openInputStream() throws IOException {
-               return new MigrationFSInputStream(delegate.openInputStream());
-       }
-
-       @Override
-       public void discardState() throws Exception {
-               delegate.discardState();
-       }
-
-       @Override
-       public long getStateSize() {
-               return delegate.getStateSize();
-       }
-
-       static class MigrationFSInputStream extends FSDataInputStreamWrapper 
implements Migration {
-
-               public MigrationFSInputStream(FSDataInputStream inputStream) {
-                       super(inputStream);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
deleted file mode 100644
index b044ffb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.StateHandle;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-
-/**
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Deprecated
-@Internal
-@SuppressWarnings("deprecation")
-public class StreamTaskState implements Serializable, Closeable {
-
-       private static final long serialVersionUID = 1L;
-
-       private StateHandle<?> operatorState;
-
-       private StateHandle<Serializable> functionState;
-
-       private HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates;
-
-       // 
------------------------------------------------------------------------
-
-       public StateHandle<?> getOperatorState() {
-               return operatorState;
-       }
-
-       public void setOperatorState(StateHandle<?> operatorState) {
-               this.operatorState = operatorState;
-       }
-
-       public StateHandle<Serializable> getFunctionState() {
-               return functionState;
-       }
-
-       public void setFunctionState(StateHandle<Serializable> functionState) {
-               this.functionState = functionState;
-       }
-
-       public HashMap<String, KvStateSnapshot<?, ?, ?, ?>> getKvStates() {
-               return kvStates;
-       }
-
-       public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?, ?>> 
kvStates) {
-               this.kvStates = kvStates;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Checks if this state object actually contains any state, or if all 
of the state
-        * fields are null.
-        *
-        * @return True, if all state is null, false if at least one state is 
not null.
-        */
-       public boolean isEmpty() {
-               return operatorState == null & functionState == null & kvStates 
== null;
-       }
-
-       @Override
-       public void close() throws IOException {
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
deleted file mode 100644
index 7643039..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.StateHandle;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * @deprecated Internal class for savepoint backwards compatibility. Don't use 
for other purposes.
- */
-@Deprecated
-@Internal
-@SuppressWarnings("deprecation")
-public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
-
-       private static final long serialVersionUID = 1L;
-
-       /** The states for all operator. */
-       private final StreamTaskState[] states;
-
-       public StreamTaskStateList(StreamTaskState[] states) throws Exception {
-               this.states = states;
-       }
-
-       public boolean isEmpty() {
-               for (StreamTaskState state : states) {
-                       if (state != null) {
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       @Override
-       public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
-               return states;
-       }
-
-       @Override
-       public void discardState() throws Exception {
-       }
-
-       @Override
-       public long getStateSize() throws Exception {
-               long sumStateSize = 0;
-
-               if (states != null) {
-                       for (StreamTaskState state : states) {
-                               if (state != null) {
-                                       StateHandle<?> operatorState = 
state.getOperatorState();
-                                       StateHandle<?> functionState = 
state.getFunctionState();
-                                       HashMap<String, KvStateSnapshot<?, ?, 
?, ?>> kvStates = state.getKvStates();
-
-                                       if (operatorState != null) {
-                                               sumStateSize += 
operatorState.getStateSize();
-                                       }
-
-                                       if (functionState != null) {
-                                               sumStateSize += 
functionState.getStateSize();
-                                       }
-
-                                       if (kvStates != null) {
-                                               for (KvStateSnapshot<?, ?, ?, 
?> kvState : kvStates.values()) {
-                                                       if (kvState != null) {
-                                                               sumStateSize += 
kvState.getStateSize();
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               // State size as sum of all state sizes
-               return sumStateSize;
-       }
-
-       @Override
-       public void close() throws IOException {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index 145ff6a..a5f908d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -30,7 +30,7 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- * Simple container class which contains the raw/managed/legacy operator state 
and key-group state handles from all sub
+ * Simple container class which contains the raw/managed operator state and 
key-group state handles from all sub
  * tasks of an operator and therefore represents the complete state of a 
logical operator.
  */
 public class OperatorState implements CompositeStateHandle {
@@ -102,15 +102,6 @@ public class OperatorState implements CompositeStateHandle 
{
                return maxParallelism;
        }
 
-       public boolean hasNonPartitionedState() {
-               for (OperatorSubtaskState sts : operatorSubtaskStates.values()) 
{
-                       if (sts != null && sts.getLegacyOperatorState() != 
null) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
        @Override
        public void discardState() throws Exception {
                for (OperatorSubtaskState operatorSubtaskState : 
operatorSubtaskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 296b5ab..3df9c4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -63,16 +60,6 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        private static final long serialVersionUID = -2394696997971923995L;
 
        /**
-        * Legacy (non-repartitionable) operator state.
-        *
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       @Nullable
-       private final StreamStateHandle legacyOperatorState;
-
-       /**
         * Snapshot from the {@link 
org.apache.flink.runtime.state.OperatorStateBackend}.
         */
        @Nonnull
@@ -103,39 +90,30 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         */
        private final long stateSize;
 
-       @VisibleForTesting
-       public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
-
-               this(legacyOperatorState,
-                       Collections.<OperatorStateHandle>emptyList(),
-                       Collections.<OperatorStateHandle>emptyList(),
-                       Collections.<KeyedStateHandle>emptyList(),
-                       Collections.<KeyedStateHandle>emptyList());
-       }
-
        /**
         * Empty state.
         */
        public OperatorSubtaskState() {
-               this(null);
+               this(
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       Collections.emptyList());
        }
 
        public OperatorSubtaskState(
-               StreamStateHandle legacyOperatorState,
                Collection<OperatorStateHandle> managedOperatorState,
                Collection<OperatorStateHandle> rawOperatorState,
                Collection<KeyedStateHandle> managedKeyedState,
                Collection<KeyedStateHandle> rawKeyedState) {
 
-               this.legacyOperatorState = legacyOperatorState;
                this.managedOperatorState = 
Preconditions.checkNotNull(managedOperatorState);
                this.rawOperatorState = 
Preconditions.checkNotNull(rawOperatorState);
                this.managedKeyedState = 
Preconditions.checkNotNull(managedKeyedState);
                this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState);
 
                try {
-                       long calculateStateSize = 
getSizeNullSafe(legacyOperatorState);
-                       calculateStateSize += sumAllSizes(managedOperatorState);
+                       long calculateStateSize = 
sumAllSizes(managedOperatorState);
                        calculateStateSize += sumAllSizes(rawOperatorState);
                        calculateStateSize += sumAllSizes(managedKeyedState);
                        calculateStateSize += sumAllSizes(rawKeyedState);
@@ -150,13 +128,12 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Collections (except for legacy state).
         */
        public OperatorSubtaskState(
-               StreamStateHandle legacyOperatorState,
                OperatorStateHandle managedOperatorState,
                OperatorStateHandle rawOperatorState,
                KeyedStateHandle managedKeyedState,
                KeyedStateHandle rawKeyedState) {
 
-               this(legacyOperatorState,
+               this(
                        singletonOrEmptyOnNull(managedOperatorState),
                        singletonOrEmptyOnNull(rawOperatorState),
                        singletonOrEmptyOnNull(managedKeyedState),
@@ -183,16 +160,6 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       @Nullable
-       public StreamStateHandle getLegacyOperatorState() {
-               return legacyOperatorState;
-       }
-
-       /**
         * Returns a handle to the managed operator state.
         */
        @Nonnull
@@ -228,12 +195,11 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        public void discardState() {
                try {
                        List<StateObject> toDispose =
-                               new ArrayList<>(1 +
-                                       managedOperatorState.size() +
-                                       rawOperatorState.size() +
-                                       managedKeyedState.size() +
-                                       rawKeyedState.size());
-                       toDispose.add(legacyOperatorState);
+                               new ArrayList<>(
+                                               managedOperatorState.size() +
+                                               rawOperatorState.size() +
+                                               managedKeyedState.size() +
+                                               rawKeyedState.size());
                        toDispose.addAll(managedOperatorState);
                        toDispose.addAll(rawOperatorState);
                        toDispose.addAll(managedKeyedState);
@@ -281,9 +247,6 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
                if (getStateSize() != that.getStateSize()) {
                        return false;
                }
-               if (getLegacyOperatorState() != null ? 
!getLegacyOperatorState().equals(that.getLegacyOperatorState()) : 
that.getLegacyOperatorState() != null) {
-                       return false;
-               }
                if 
(!getManagedOperatorState().equals(that.getManagedOperatorState())) {
                        return false;
                }
@@ -298,8 +261,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
 
        @Override
        public int hashCode() {
-               int result = getLegacyOperatorState() != null ? 
getLegacyOperatorState().hashCode() : 0;
-               result = 31 * result + getManagedOperatorState().hashCode();
+               int result = getManagedOperatorState().hashCode();
                result = 31 * result + getRawOperatorState().hashCode();
                result = 31 * result + getManagedKeyedState().hashCode();
                result = 31 * result + getRawKeyedState().hashCode();
@@ -310,8 +272,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        @Override
        public String toString() {
                return "SubtaskState{" +
-                       "legacyState=" + legacyOperatorState +
-                       ", operatorStateFromBackend=" + managedOperatorState +
+                       "operatorStateFromBackend=" + managedOperatorState +
                        ", operatorStateFromStream=" + rawOperatorState +
                        ", keyedStateFromBackend=" + managedKeyedState +
                        ", keyedStateFromStream=" + rawKeyedState +
@@ -320,8 +281,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        }
 
        public boolean hasState() {
-               return legacyOperatorState != null
-                       || hasState(managedOperatorState)
+               return hasState(managedOperatorState)
                        || hasState(rawOperatorState)
                        || hasState(managedKeyedState)
                        || hasState(rawKeyedState);

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index b69285e..cc9f9cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -162,8 +161,6 @@ public class StateAssignmentOperation {
                        Execution currentExecutionAttempt = 
executionJobVertex.getTaskVertices()[subTaskIndex]
                                .getCurrentExecutionAttempt();
 
-                       List<StreamStateHandle> subNonPartitionableState = new 
ArrayList<>();
-
                        Tuple2<Collection<KeyedStateHandle>, 
Collection<KeyedStateHandle>> subKeyedState = null;
 
                        List<Collection<OperatorStateHandle>> 
subManagedOperatorState = new ArrayList<>();
@@ -174,15 +171,6 @@ public class StateAssignmentOperation {
                                OperatorState operatorState = 
operatorStates.get(operatorIndex);
                                int oldParallelism = 
operatorState.getParallelism();
 
-                               // NonPartitioned State
-
-                               reAssignSubNonPartitionedStates(
-                                       operatorState,
-                                       subTaskIndex,
-                                       newParallelism,
-                                       oldParallelism,
-                                       subNonPartitionableState);
-
                                // PartitionedState
                                reAssignSubPartitionableState(
                                        newManagedOperatorStates,
@@ -204,8 +192,7 @@ public class StateAssignmentOperation {
                        }
 
                        // check if a stateless task
-                       if (!allElementsAreNull(subNonPartitionableState) ||
-                               !allElementsAreNull(subManagedOperatorState) ||
+                       if (!allElementsAreNull(subManagedOperatorState) ||
                                !allElementsAreNull(subRawOperatorState) ||
                                subKeyedState != null) {
 
@@ -226,7 +213,6 @@ public class StateAssignmentOperation {
 
                                        OperatorSubtaskState 
operatorSubtaskState =
                                                new OperatorSubtaskState(
-                                                       
subNonPartitionableState.get(i),
                                                        
subManagedOperatorState.get(i),
                                                        
subRawOperatorState.get(i),
                                                        managedKeyed,
@@ -314,24 +300,6 @@ public class StateAssignmentOperation {
                return true;
        }
 
-
-       private void reAssignSubNonPartitionedStates(
-                       OperatorState operatorState,
-                       int subTaskIndex,
-                       int newParallelism,
-                       int oldParallelism,
-               List<StreamStateHandle> subNonPartitionableState) {
-               if (oldParallelism == newParallelism) {
-                       if (operatorState.getState(subTaskIndex) != null) {
-                               
subNonPartitionableState.add(operatorState.getState(subTaskIndex).getLegacyOperatorState());
-                       } else {
-                               subNonPartitionableState.add(null);
-                       }
-               } else {
-                       subNonPartitionableState.add(null);
-               }
-       }
-
        private void reDistributePartitionableStates(
                        List<OperatorState> operatorStates, int newParallelism,
                        List<List<Collection<OperatorStateHandle>>> 
newManagedOperatorStates,
@@ -524,19 +492,6 @@ public class StateAssignmentOperation {
                                        "is currently not supported.");
                        }
                }
-
-               //----------------------------------------parallelism 
preconditions-----------------------------------------
-
-               final int oldParallelism = operatorState.getParallelism();
-               final int newParallelism = executionJobVertex.getParallelism();
-
-               if (operatorState.hasNonPartitionedState() && (oldParallelism 
!= newParallelism)) {
-                       throw new IllegalStateException("Cannot restore the 
latest checkpoint because " +
-                               "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
-                               "state and its parallelism changed. The 
operator " + executionJobVertex.getJobVertexId() +
-                               " has parallelism " + newParallelism + " 
whereas the corresponding " +
-                               "state object has a parallelism of " + 
oldParallelism);
-               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 20d675b..281693b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -25,14 +25,12 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Container for the chained state of one parallel subtask of an 
operator/task. This is part of the
  * {@link TaskState}.
@@ -44,15 +42,6 @@ public class SubtaskState implements CompositeStateHandle {
        private static final long serialVersionUID = -2394696997971923995L;
 
        /**
-        * Legacy (non-repartitionable) operator state.
-        *
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       private final ChainedStateHandle<StreamStateHandle> legacyOperatorState;
-
-       /**
         * Snapshot from the {@link 
org.apache.flink.runtime.state.OperatorStateBackend}.
         */
        private final ChainedStateHandle<OperatorStateHandle> 
managedOperatorState;
@@ -80,21 +69,18 @@ public class SubtaskState implements CompositeStateHandle {
        private final long stateSize;
 
        public SubtaskState(
-                       ChainedStateHandle<StreamStateHandle> 
legacyOperatorState,
                        ChainedStateHandle<OperatorStateHandle> 
managedOperatorState,
                        ChainedStateHandle<OperatorStateHandle> 
rawOperatorState,
                        KeyedStateHandle managedKeyedState,
                        KeyedStateHandle rawKeyedState) {
 
-               this.legacyOperatorState = checkNotNull(legacyOperatorState, 
"State");
                this.managedOperatorState = managedOperatorState;
                this.rawOperatorState = rawOperatorState;
                this.managedKeyedState = managedKeyedState;
                this.rawKeyedState = rawKeyedState;
 
                try {
-                       long calculateStateSize = 
getSizeNullSafe(legacyOperatorState);
-                       calculateStateSize += 
getSizeNullSafe(managedOperatorState);
+                       long calculateStateSize = 
getSizeNullSafe(managedOperatorState);
                        calculateStateSize += getSizeNullSafe(rawOperatorState);
                        calculateStateSize += 
getSizeNullSafe(managedKeyedState);
                        calculateStateSize += getSizeNullSafe(rawKeyedState);
@@ -110,15 +96,6 @@ public class SubtaskState implements CompositeStateHandle {
 
        // 
--------------------------------------------------------------------------------------------
 
-       /**
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       public ChainedStateHandle<StreamStateHandle> getLegacyOperatorState() {
-               return legacyOperatorState;
-       }
-
        public ChainedStateHandle<OperatorStateHandle> 
getManagedOperatorState() {
                return managedOperatorState;
        }
@@ -140,7 +117,6 @@ public class SubtaskState implements CompositeStateHandle {
                try {
                        StateUtil.bestEffortDiscardAllStateObjects(
                                Arrays.asList(
-                                       legacyOperatorState,
                                        managedOperatorState,
                                        rawOperatorState,
                                        managedKeyedState,
@@ -183,11 +159,6 @@ public class SubtaskState implements CompositeStateHandle {
                        return false;
                }
 
-               if (legacyOperatorState != null ?
-                               
!legacyOperatorState.equals(that.legacyOperatorState)
-                               : that.legacyOperatorState != null) {
-                       return false;
-               }
                if (managedOperatorState != null ?
                                
!managedOperatorState.equals(that.managedOperatorState)
                                : that.managedOperatorState != null) {
@@ -211,8 +182,7 @@ public class SubtaskState implements CompositeStateHandle {
 
        @Override
        public int hashCode() {
-               int result = legacyOperatorState != null ? 
legacyOperatorState.hashCode() : 0;
-               result = 31 * result + (managedOperatorState != null ? 
managedOperatorState.hashCode() : 0);
+               int result = (managedOperatorState != null ? 
managedOperatorState.hashCode() : 0);
                result = 31 * result + (rawOperatorState != null ? 
rawOperatorState.hashCode() : 0);
                result = 31 * result + (managedKeyedState != null ? 
managedKeyedState.hashCode() : 0);
                result = 31 * result + (rawKeyedState != null ? 
rawKeyedState.hashCode() : 0);
@@ -223,8 +193,7 @@ public class SubtaskState implements CompositeStateHandle {
        @Override
        public String toString() {
                return "SubtaskState{" +
-                               "chainedStateHandle=" + legacyOperatorState +
-                               ", operatorStateFromBackend=" + 
managedOperatorState +
+                               "operatorStateFromBackend=" + 
managedOperatorState +
                                ", operatorStateFromStream=" + rawOperatorState 
+
                                ", keyedStateFromBackend=" + managedKeyedState +
                                ", keyedStateFromStream=" + rawKeyedState +

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index ed847a4..0f3bedb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -48,7 +48,6 @@ public class TaskState implements CompositeStateHandle {
        /** handles to non-partitioned states, subtaskindex -> subtaskstate */
        private final Map<Integer, SubtaskState> subtaskStates;
 
-
        /** parallelism of the operator when it was checkpointed */
        private final int parallelism;
 
@@ -117,15 +116,6 @@ public class TaskState implements CompositeStateHandle {
                return chainLength;
        }
 
-       public boolean hasNonPartitionedState() {
-               for(SubtaskState sts : subtaskStates.values()) {
-                       if (sts != null && 
!sts.getLegacyOperatorState().isEmpty()) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
        @Override
        public void discardState() throws Exception {
                for (SubtaskState subtaskState : subtaskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index c1fcf4f..12e9c5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
-import 
org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
@@ -30,15 +29,20 @@ import java.util.Map;
  */
 public class SavepointSerializers {
 
+       /** If this flag is true, restoring a savepoint fails if it contains 
legacy state (<= Flink 1.1 format) */
+       static boolean FAIL_WHEN_LEGACY_STATE_DETECTED = true;
 
        private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = 
new HashMap<>(2);
 
        static {
-               SERIALIZERS.put(SavepointV0.VERSION, 
SavepointV0Serializer.INSTANCE);
                SERIALIZERS.put(SavepointV1.VERSION, 
SavepointV1Serializer.INSTANCE);
                SERIALIZERS.put(SavepointV2.VERSION, 
SavepointV2Serializer.INSTANCE);
        }
 
+       private SavepointSerializers() {
+               throw new AssertionError();
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -77,4 +81,12 @@ public class SavepointSerializers {
                }
        }
 
+       /**
+        * This is only visible as a temporary solution to keep the stateful 
job migration it cases working from binary
+        * savepoints that still contain legacy state (<= Flink 1.1).
+        */
+       @VisibleForTesting
+       public static void setFailWhenLegacyStateDetected(boolean fail) {
+               FAIL_WHEN_LEGACY_STATE_DETECTED = fail;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 7beb1b8..586df57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index f67d54c..c26c983 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.Preconditions;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -59,7 +60,6 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
        private static final byte KEY_GROUPS_HANDLE = 3;
        private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
 
-
        public static final SavepointV1Serializer INSTANCE = new 
SavepointV1Serializer();
 
        private SavepointV1Serializer() {
@@ -130,20 +130,15 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
 
        private static void serializeSubtaskState(SubtaskState subtaskState, 
DataOutputStream dos) throws IOException {
 
-               dos.writeLong(-1);
-
-               ChainedStateHandle<StreamStateHandle> nonPartitionableState = 
subtaskState.getLegacyOperatorState();
+               //backwards compatibility, do not remove
+               dos.writeLong(-1L);
 
-               int len = nonPartitionableState != null ? 
nonPartitionableState.getLength() : 0;
-               dos.writeInt(len);
-               for (int i = 0; i < len; ++i) {
-                       StreamStateHandle stateHandle = 
nonPartitionableState.get(i);
-                       serializeStreamStateHandle(stateHandle, dos);
-               }
+               //backwards compatibility (number of legacy state handles), do 
not remove
+               dos.writeInt(0);
 
                ChainedStateHandle<OperatorStateHandle> operatorStateBackend = 
subtaskState.getManagedOperatorState();
 
-               len = operatorStateBackend != null ? 
operatorStateBackend.getLength() : 0;
+               int len = operatorStateBackend != null ? 
operatorStateBackend.getLength() : 0;
                dos.writeInt(len);
                for (int i = 0; i < len; ++i) {
                        OperatorStateHandle stateHandle = 
operatorStateBackend.get(i);
@@ -171,12 +166,19 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
                long ignoredDuration = dis.readLong();
 
                int len = dis.readInt();
-               List<StreamStateHandle> nonPartitionableState = new 
ArrayList<>(len);
-               for (int i = 0; i < len; ++i) {
-                       StreamStateHandle streamStateHandle = 
deserializeStreamStateHandle(dis);
-                       nonPartitionableState.add(streamStateHandle);
-               }
 
+               if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) {
+                       Preconditions.checkState(len == 0,
+                               "Legacy state (from Flink <= 1.1, created 
through the 'Checkpointed' interface) is " +
+                                       "no longer supported starting from 
Flink 1.4. Please rewrite your job to use " +
+                                       "'CheckpointedFunction' instead!");
+
+               } else {
+                       for (int i = 0; i < len; ++i) {
+                               // absorb bytes from stream and ignore result
+                               deserializeStreamStateHandle(dis);
+                       }
+               }
 
                len = dis.readInt();
                List<OperatorStateHandle> operatorStateBackend = new 
ArrayList<>(len);
@@ -196,9 +198,6 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
 
                KeyedStateHandle keyedStateStream = 
deserializeKeyedStateHandle(dis);
 
-               ChainedStateHandle<StreamStateHandle> 
nonPartitionableStateChain =
-                               new ChainedStateHandle<>(nonPartitionableState);
-
                ChainedStateHandle<OperatorStateHandle> 
operatorStateBackendChain =
                                new ChainedStateHandle<>(operatorStateBackend);
 
@@ -206,7 +205,6 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
                                new ChainedStateHandle<>(operatorStateStream);
 
                return new SubtaskState(
-                               nonPartitionableStateChain,
                                operatorStateBackendChain,
                                operatorStateStreamChain,
                                keyedStateBackend,

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index bd364a2..9e406df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
@@ -207,9 +206,6 @@ public class SavepointV2 implements Savepoint {
                                        continue;
                                }
 
-                               @SuppressWarnings("deprecation")
-                               ChainedStateHandle<StreamStateHandle> 
nonPartitionedState =
-                                       subtaskState.getLegacyOperatorState();
                                ChainedStateHandle<OperatorStateHandle> 
partitioneableState =
                                        subtaskState.getManagedOperatorState();
                                ChainedStateHandle<OperatorStateHandle> 
rawOperatorState =
@@ -240,7 +236,6 @@ public class SavepointV2 implements Savepoint {
                                                }
 
                                                OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
-                                                       nonPartitionedState != 
null ? nonPartitionedState.get(operatorIndex) : null,
                                                        partitioneableState != 
null ? partitioneableState.get(operatorIndex) : null,
                                                        rawOperatorState != 
null ? rawOperatorState.get(operatorIndex) : null,
                                                        managedKeyedState,

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 15628a0..5636a52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -256,13 +257,8 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
 
                dos.writeLong(-1);
 
-               StreamStateHandle nonPartitionableState = 
subtaskState.getLegacyOperatorState();
-
-               int len = nonPartitionableState != null ? 1 : 0;
+               int len = 0;
                dos.writeInt(len);
-               if (len == 1) {
-                       serializeStreamStateHandle(nonPartitionableState, dos);
-               }
 
                OperatorStateHandle operatorStateBackend = 
extractSingleton(subtaskState.getManagedOperatorState());
 
@@ -288,11 +284,23 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
        }
 
        private static OperatorSubtaskState 
deserializeSubtaskState(DataInputStream dis) throws IOException {
-               // Duration field has been removed from SubtaskState
+               // Duration field has been removed from SubtaskState, do not 
remove
                long ignoredDuration = dis.readLong();
 
+               // for compatibility, do not remove
                int len = dis.readInt();
-               StreamStateHandle nonPartitionableState = len == 0 ? null : 
deserializeStreamStateHandle(dis);
+
+               if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) {
+                       Preconditions.checkState(len == 0,
+                               "Legacy state (from Flink <= 1.1, created 
through the 'Checkpointed' interface) is " +
+                                       "no longer supported starting from 
Flink 1.4. Please rewrite your job to use " +
+                                       "'CheckpointedFunction' instead!");
+               } else {
+                       for (int i = 0; i < len; ++i) {
+                               // absorb bytes from stream and ignore result
+                               deserializeStreamStateHandle(dis);
+                       }
+               }
 
                len = dis.readInt();
                OperatorStateHandle operatorStateBackend = len == 0 ? null : 
deserializeOperatorStateHandle(dis);
@@ -305,7 +313,6 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                KeyedStateHandle keyedStateStream = 
deserializeKeyedStateHandle(dis);
 
                return new OperatorSubtaskState(
-                               nonPartitionableState,
                                operatorStateBackend,
                                operatorStateStream,
                                keyedStateBackend,

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 2800899..8b58891 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 
 import java.io.IOException;
 
@@ -90,11 +89,4 @@ public final class VoidNamespaceSerializer extends 
TypeSerializerSingleton<VoidN
        public boolean canEqual(Object obj) {
                return obj instanceof VoidNamespaceSerializer;
        }
-
-       @Override
-       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
-               // we might be replacing a migration namespace serializer, in 
which case we just assume compatibility
-               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
-                       || 
identifier.equals(MigrationNamespaceSerializerProxy.class.getCanonicalName());
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index d1c0466..e235b96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -35,11 +35,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
-import org.apache.flink.migration.MigrationUtil;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import 
org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
-import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
 import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
@@ -65,7 +60,6 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
 
@@ -190,7 +184,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // check compatibility results to determine if state 
migration is required
                        CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                        
restoredMetaInfo.getNamespaceSerializer(),
-                                       MigrationNamespaceSerializerProxy.class,
+                                       null,
                                        
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
                                        newMetaInfo.getNamespaceSerializer());
 
@@ -405,11 +399,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        LOG.debug("Restoring snapshot from state handles: {}.", 
restoredState);
                }
 
-               if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
-                       restoreOldSavepointKeyedState(restoredState);
-               } else {
-                       restorePartitionedState(restoredState);
-               }
+               restorePartitionedState(restoredState);
        }
 
        @SuppressWarnings({"unchecked"})
@@ -560,55 +550,6 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        /**
-        * @deprecated Used for backwards compatibility with previous savepoint 
versions.
-        */
-       @SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"})
-       @Deprecated
-       private void restoreOldSavepointKeyedState(
-                       Collection<KeyedStateHandle> stateHandles) throws 
IOException, ClassNotFoundException {
-
-               if (stateHandles.isEmpty()) {
-                       return;
-               }
-
-               Preconditions.checkState(1 == stateHandles.size(), "Only one 
element expected here.");
-
-               KeyedStateHandle keyedStateHandle = 
stateHandles.iterator().next();
-               if (!(keyedStateHandle instanceof 
MigrationKeyGroupStateHandle)) {
-                       throw new IllegalStateException("Unexpected state 
handle type, " +
-                                       "expected: " + 
MigrationKeyGroupStateHandle.class +
-                                       ", but found " + 
keyedStateHandle.getClass());
-               }
-
-               MigrationKeyGroupStateHandle keyGroupStateHandle = 
(MigrationKeyGroupStateHandle) keyedStateHandle;
-
-               HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates;
-               try (FSDataInputStream inputStream = 
keyGroupStateHandle.openInputStream()) {
-                       namedStates = 
InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
-               }
-
-               for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState 
: namedStates.entrySet()) {
-
-                       final String stateName = nameToState.getKey();
-                       final KvStateSnapshot<K, ?, ?, ?> genericSnapshot = 
nameToState.getValue();
-
-                       if (genericSnapshot instanceof 
MigrationRestoreSnapshot) {
-                               MigrationRestoreSnapshot<K, ?, ?> stateSnapshot 
= (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot;
-                               final StateTable rawResultMap =
-                                               
stateSnapshot.deserialize(stateName, this);
-
-                               // mimic a restored kv state meta info
-                               restoredKvStateMetaInfos.put(stateName, 
rawResultMap.getMetaInfo().snapshot());
-
-                               // add named state to the backend
-                               stateTables.put(stateName, rawResultMap);
-                       } else {
-                               throw new IllegalStateException("Unknown state: 
" + genericSnapshot);
-                       }
-               }
-       }
-
-       /**
         * Returns the total number of state entries across all keys/namespaces.
         */
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 26db772..7c95a34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -92,15 +91,12 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
 
                final long checkpointId = 
coord.getPendingCheckpoints().keySet().iterator().next();
 
-
-               StreamStateHandle legacyHandle = mock(StreamStateHandle.class);
                KeyedStateHandle managedKeyedHandle = 
mock(KeyedStateHandle.class);
                KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
                OperatorStateHandle managedOpHandle = 
mock(OperatorStateHandle.class);
                OperatorStateHandle rawOpHandle = 
mock(OperatorStateHandle.class);
 
                final OperatorSubtaskState operatorSubtaskState = spy(new 
OperatorSubtaskState(
-                       legacyHandle,
                        managedOpHandle,
                        rawOpHandle,
                        managedKeyedHandle,
@@ -126,7 +122,6 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
 
                // make sure that the subtask state has been discarded after we 
could not complete it.
                verify(operatorSubtaskState).discardState();
-               
verify(operatorSubtaskState.getLegacyOperatorState()).discardState();
                
verify(operatorSubtaskState.getManagedOperatorState().iterator().next()).discardState();
                
verify(operatorSubtaskState.getRawOperatorState().iterator().next()).discardState();
                
verify(operatorSubtaskState.getManagedKeyedState().iterator().next()).discardState();

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 45cbbc3..4193c2c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -90,7 +90,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -2149,15 +2148,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
-               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
                List<KeyGroupRange> keyGroupPartitions1 = 
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
                List<KeyGroupRange> keyGroupPartitions2 = 
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
 
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
-                       StreamStateHandle valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
                        KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(valueSizeTuple, null, null, keyGroupState, null);
+                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(null, null, keyGroupState, null);
                        TaskStateSnapshot taskOperatorSubtaskStates = new 
TaskStateSnapshot();
                        
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID1),
 operatorSubtaskState);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
@@ -2172,9 +2169,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
 
                for (int index = 0; index < jobVertex2.getParallelism(); 
index++) {
-                       StreamStateHandle valueSizeTuple = 
generateStateForVertex(jobVertexID2, index);
                        KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(valueSizeTuple, null, null, keyGroupState, null);
+                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(null, null, keyGroupState, null);
                        TaskStateSnapshot taskOperatorSubtaskStates = new 
TaskStateSnapshot();
                        
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2),
 operatorSubtaskState);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
@@ -2214,137 +2210,6 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                fail("The restoration should have failed because the max 
parallelism changed.");
        }
 
-       /**
-        * Tests that the checkpoint restoration fails if the parallelism of a 
job vertices with
-        * non-partitioned state has changed.
-        *
-        * @throws Exception
-        */
-       @Test(expected=IllegalStateException.class)
-       public void testRestoreLatestCheckpointFailureWhenParallelismChanges() 
throws Exception {
-               final JobID jid = new JobID();
-               final long timestamp = System.currentTimeMillis();
-
-               final JobVertexID jobVertexID1 = new JobVertexID();
-               final JobVertexID jobVertexID2 = new JobVertexID();
-               int parallelism1 = 3;
-               int parallelism2 = 2;
-               int maxParallelism1 = 42;
-               int maxParallelism2 = 13;
-
-               final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
-                       jobVertexID1,
-                       parallelism1,
-                       maxParallelism1);
-               final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(
-                       jobVertexID2,
-                       parallelism2,
-                       maxParallelism2);
-
-               List<ExecutionVertex> allExecutionVertices = new 
ArrayList<>(parallelism1 + parallelism2);
-
-               
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
-               
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
-
-               ExecutionVertex[] arrayExecutionVertices =
-                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
-
-               // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
-                       600000,
-                       600000,
-                       0,
-                       Integer.MAX_VALUE,
-                       ExternalizedCheckpointSettings.none(),
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       new StandaloneCheckpointIDCounter(),
-                       new StandaloneCompletedCheckpointStore(1),
-                       null,
-                       Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
-
-               // trigger the checkpoint
-               coord.triggerCheckpoint(timestamp, false);
-
-               assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
-               long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
-               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
-
-               List<KeyGroupRange> keyGroupPartitions1 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
-               List<KeyGroupRange> keyGroupPartitions2 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
-
-               for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
-                       StreamStateHandle valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
-                       KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID1, 
keyGroupPartitions1.get(index), false);
-
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(valueSizeTuple, null, null, keyGroupState, null);
-                       TaskStateSnapshot taskOperatorSubtaskStates = new 
TaskStateSnapshot();
-                       
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID1),
 operatorSubtaskState);
-
-                       AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       taskOperatorSubtaskStates);
-
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
-               }
-
-
-               for (int index = 0; index < jobVertex2.getParallelism(); 
index++) {
-
-                       StreamStateHandle state = 
generateStateForVertex(jobVertexID2, index);
-                       KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID2, 
keyGroupPartitions2.get(index), false);
-
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(state, null, null, keyGroupState, null);
-                       TaskStateSnapshot taskOperatorSubtaskStates = new 
TaskStateSnapshot();
-                       
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2),
 operatorSubtaskState);
-                       AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       taskOperatorSubtaskStates);
-
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
-               }
-
-               List<CompletedCheckpoint> completedCheckpoints = 
coord.getSuccessfulCheckpoints();
-
-               assertEquals(1, completedCheckpoints.size());
-
-               Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
-
-               int newParallelism1 = 4;
-               int newParallelism2 = 3;
-
-               final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
-                       jobVertexID1,
-                       newParallelism1,
-                       maxParallelism1);
-
-               final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
-                       jobVertexID2,
-                       newParallelism2,
-                       maxParallelism2);
-
-               tasks.put(jobVertexID1, newJobVertex1);
-               tasks.put(jobVertexID2, newJobVertex2);
-
-               coord.restoreLatestCheckpointedState(tasks, true, false);
-
-               fail("The restoration should have failed because the 
parallelism of an vertex with " +
-                       "non-partitioned state changed.");
-       }
-
        @Test
        public void testRestoreLatestCheckpointedStateScaleIn() throws 
Exception {
                
testRestoreLatestCheckpointedStateWithChangingParallelism(false);
@@ -2439,12 +2304,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                //vertex 1
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
-                       StreamStateHandle valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
                        OperatorStateHandle opStateBackend = 
generatePartitionableStateHandle(jobVertexID1, index, 2, 8, false);
                        KeyGroupsStateHandle keyedStateBackend = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
                        KeyGroupsStateHandle keyedStateRaw = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true);
-
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, 
keyedStateRaw);
+                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(opStateBackend, null, keyedStateBackend, keyedStateRaw);
                        TaskStateSnapshot taskOperatorSubtaskStates = new 
TaskStateSnapshot();
                        
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID1),
 operatorSubtaskState);
 
@@ -2469,7 +2332,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        expectedOpStatesBackend.add(new 
ChainedStateHandle<>(Collections.singletonList(opStateBackend)));
                        expectedOpStatesRaw.add(new 
ChainedStateHandle<>(Collections.singletonList(opStateRaw)));
 
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(null, opStateBackend, opStateRaw, keyedStateBackend, 
keyedStateRaw);
+                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(opStateBackend, opStateRaw, keyedStateBackend, 
keyedStateRaw);
                        TaskStateSnapshot taskOperatorSubtaskStates = new 
TaskStateSnapshot();
                        
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2),
 operatorSubtaskState);
 
@@ -2527,7 +2390,6 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        for (int idx = 0; idx < operatorIDs.size(); ++idx) {
                                OperatorID operatorID = operatorIDs.get(idx);
                                OperatorSubtaskState opState = 
taskStateHandles.getSubtaskStateByOperatorID(operatorID);
-                               
Assert.assertNull(opState.getLegacyOperatorState());
                                Collection<OperatorStateHandle> opStateBackend 
= opState.getManagedOperatorState();
                                Collection<OperatorStateHandle> opStateRaw = 
opState.getRawOperatorState();
                                allParallelManagedOpStates.add(opStateBackend);
@@ -2593,14 +2455,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        OperatorState taskState = new OperatorState(id.f1, 
parallelism1, maxParallelism1);
                        operatorStates.put(id.f1, taskState);
                        for (int index = 0; index < taskState.getParallelism(); 
index++) {
-                               StreamStateHandle subNonPartitionedState = 
-                                       generateStateForVertex(id.f0, index);
                                OperatorStateHandle subManagedOperatorState =
                                        generatePartitionableStateHandle(id.f0, 
index, 2, 8, false);
                                OperatorStateHandle subRawOperatorState =
                                        generatePartitionableStateHandle(id.f0, 
index, 2, 8, true);
-
-                               OperatorSubtaskState subtaskState = new 
OperatorSubtaskState(subNonPartitionedState,
+                               OperatorSubtaskState subtaskState = new 
OperatorSubtaskState(
                                        subManagedOperatorState,
                                        subRawOperatorState,
                                        null,
@@ -2638,7 +2497,6 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                
expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle(subRawOperatorState));
 
                                OperatorSubtaskState subtaskState = new 
OperatorSubtaskState(
-                                       null,
                                        subManagedOperatorState,
                                        subRawOperatorState,
                                        subManagedKeyedState,
@@ -2735,7 +2593,6 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                OperatorSubtaskState opState =
                                        
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
 
-                               assertNull(opState.getLegacyOperatorState());
                                
assertTrue(opState.getManagedOperatorState().isEmpty());
                                
assertTrue(opState.getRawOperatorState().isEmpty());
                        }
@@ -2745,16 +2602,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                OperatorSubtaskState opState =
                                        
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
 
-                               StreamStateHandle expectSubNonPartitionedState 
= generateStateForVertex(id1.f0, i);
                                OperatorStateHandle expectedManagedOpState = 
generatePartitionableStateHandle(
                                        id1.f0, i, 2, 8, false);
                                OperatorStateHandle expectedRawOpState = 
generatePartitionableStateHandle(
                                        id1.f0, i, 2, 8, true);
 
-                               assertTrue(CommonTestUtils.isSteamContentEqual(
-                                       
expectSubNonPartitionedState.openInputStream(),
-                                       
opState.getLegacyOperatorState().openInputStream()));
-
                                Collection<OperatorStateHandle> 
managedOperatorState = opState.getManagedOperatorState();
                                assertEquals(1, managedOperatorState.size());
                                
assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.openInputStream(),
@@ -2771,16 +2623,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                OperatorSubtaskState opState =
                                        
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
 
-                               StreamStateHandle expectSubNonPartitionedState 
= generateStateForVertex(id2.f0, i);
                                OperatorStateHandle expectedManagedOpState = 
generatePartitionableStateHandle(
                                        id2.f0, i, 2, 8, false);
                                OperatorStateHandle expectedRawOpState = 
generatePartitionableStateHandle(
                                        id2.f0, i, 2, 8, true);
 
-                               assertTrue(CommonTestUtils.isSteamContentEqual(
-                                       
expectSubNonPartitionedState.openInputStream(),
-                                       
opState.getLegacyOperatorState().openInputStream()));
-
                                Collection<OperatorStateHandle> 
managedOperatorState = opState.getManagedOperatorState();
                                assertEquals(1, managedOperatorState.size());
                                
assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.openInputStream(),
@@ -2816,8 +2663,6 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                                
actualManagedOperatorStates.add(actualSubManagedOperatorState);
                                
actualRawOperatorStates.add(actualSubRawOperatorState);
-
-                               assertNull(opState.getLegacyOperatorState());
                        }
 
                        // operator 6
@@ -2825,7 +2670,6 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                int operatorIndexInChain = 0;
                                OperatorSubtaskState opState =
                                        
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
-                               assertNull(opState.getLegacyOperatorState());
                                
assertTrue(opState.getManagedOperatorState().isEmpty());
                                
assertTrue(opState.getRawOperatorState().isEmpty());
 
@@ -3216,13 +3060,12 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                int index,
                KeyGroupRange keyGroupRange) throws IOException {
 
-               StreamStateHandle nonPartitionedState = 
generateStateForVertex(jobVertexID, index);
                OperatorStateHandle partitionableState = 
generatePartitionableStateHandle(jobVertexID, index, 2, 8, false);
                KeyGroupsStateHandle partitionedKeyGroupState = 
generateKeyGroupState(jobVertexID, keyGroupRange, false);
 
                TaskStateSnapshot subtaskStates = spy(new TaskStateSnapshot());
                OperatorSubtaskState subtaskState = spy(new 
OperatorSubtaskState(
-                       nonPartitionedState, partitionableState, null, 
partitionedKeyGroupState, null)
+                       partitionableState, null, partitionedKeyGroupState, 
null)
                );
 
                
subtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID),
 subtaskState);
@@ -3236,17 +3079,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
 
-                       final List<OperatorID> operatorIds = 
executionJobVertex.getOperatorIDs();
-
                        TaskStateSnapshot stateSnapshot = 
executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
 
                        OperatorSubtaskState operatorState = 
stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID));
 
-                       StreamStateHandle expectNonPartitionedState = 
generateStateForVertex(jobVertexID, i);
-                       assertTrue(CommonTestUtils.isSteamContentEqual(
-                                       
expectNonPartitionedState.openInputStream(),
-                               
operatorState.getLegacyOperatorState().openInputStream()));
-
                        ChainedStateHandle<OperatorStateHandle> 
expectedOpStateBackend =
                                        
generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
 
@@ -3926,7 +3762,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        spy(new ByteStreamStateHandle("meta", 
new byte[]{'m'}))));
 
                        OperatorSubtaskState operatorSubtaskState =
-                               spy(new OperatorSubtaskState(null,
+                               spy(new OperatorSubtaskState(
                                        
Collections.<OperatorStateHandle>emptyList(),
                                        
Collections.<OperatorStateHandle>emptyList(),
                                        
Collections.<KeyedStateHandle>singletonList(managedState),

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 791bffa..1788434 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -29,12 +29,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 
 import org.hamcrest.BaseMatcher;
@@ -67,7 +65,6 @@ public class CheckpointStateRestoreTest {
        public void testSetState() {
                try {
 
-                       final ChainedStateHandle<StreamStateHandle> 
serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new 
SerializableObject());
                        KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
                        List<SerializableObject> testStates = 
Collections.singletonList(new SerializableObject());
                        final KeyedStateHandle serializedKeyGroupStates = 
CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
@@ -125,7 +122,6 @@ public class CheckpointStateRestoreTest {
                        subtaskStates.putSubtaskStateByOperatorID(
                                OperatorID.fromJobVertexID(statefulId),
                                new OperatorSubtaskState(
-                                       serializedState.get(0),
                                        
Collections.<OperatorStateHandle>emptyList(),
                                        
Collections.<OperatorStateHandle>emptyList(),
                                        
Collections.singletonList(serializedKeyGroupStates),
@@ -249,17 +245,13 @@ public class CheckpointStateRestoreTest {
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY);
 
-               StreamStateHandle serializedState = CheckpointCoordinatorTest
-                               .generateChainedStateHandle(new 
SerializableObject())
-                               .get(0);
-
                // --- (2) Checkpoint misses state for a jobVertex (should 
work) ---
                Map<OperatorID, OperatorState> checkpointTaskStates = new 
HashMap<>();
                {
                        OperatorState taskState = new 
OperatorState(operatorId1, 3, 3);
-                       taskState.putState(0, new 
OperatorSubtaskState(serializedState));
-                       taskState.putState(1, new 
OperatorSubtaskState(serializedState));
-                       taskState.putState(2, new 
OperatorSubtaskState(serializedState));
+                       taskState.putState(0, new OperatorSubtaskState());
+                       taskState.putState(1, new OperatorSubtaskState());
+                       taskState.putState(2, new OperatorSubtaskState());
 
                        checkpointTaskStates.put(operatorId1, taskState);
                }
@@ -286,7 +278,7 @@ public class CheckpointStateRestoreTest {
                // There is no task for this
                {
                        OperatorState taskState = new 
OperatorState(newOperatorID, 1, 1);
-                       taskState.putState(0, new 
OperatorSubtaskState(serializedState));
+                       taskState.putState(0, new OperatorSubtaskState());
 
                        checkpointTaskStates.put(newOperatorID, taskState);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index de1f599..acedb50 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -77,7 +77,6 @@ public class CheckpointTestUtils {
 
                        OperatorState taskState = new OperatorState(new 
OperatorID(), numSubtasksPerTask, 128);
 
-                       boolean hasNonPartitionableState = random.nextBoolean();
                        boolean hasOperatorStateBackend = random.nextBoolean();
                        boolean hasOperatorStateStream = random.nextBoolean();
 
@@ -87,7 +86,6 @@ public class CheckpointTestUtils {
 
                        for (int subtaskIdx = 0; subtaskIdx < 
numSubtasksPerTask; subtaskIdx++) {
 
-                               StreamStateHandle nonPartitionableState = null;
                                StreamStateHandle operatorStateBackend =
                                        new 
TestByteStreamStateHandleDeepCompare("b", 
("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
                                StreamStateHandle operatorStateStream =
@@ -101,11 +99,6 @@ public class CheckpointTestUtils {
                                offsetsMap.put("B", new 
OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
                                offsetsMap.put("C", new 
OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, 
OperatorStateHandle.Mode.BROADCAST));
 
-                               if (hasNonPartitionableState) {
-                                       nonPartitionableState =
-                                               new 
TestByteStreamStateHandleDeepCompare("a", 
("Hi").getBytes(ConfigConstants.DEFAULT_CHARSET));
-                               }
-
                                if (hasOperatorStateBackend) {
                                        operatorStateHandleBackend = new 
OperatorStateHandle(offsetsMap, operatorStateBackend);
                                }
@@ -130,7 +123,6 @@ public class CheckpointTestUtils {
                                }
 
                                taskState.putState(subtaskIdx, new 
OperatorSubtaskState(
-                                               nonPartitionableState,
                                                operatorStateHandleBackend,
                                                operatorStateHandleStream,
                                                keyedStateStream,
@@ -175,15 +167,11 @@ public class CheckpointTestUtils {
 
                        for (int subtaskIdx = 0; subtaskIdx < 
numSubtasksPerTask; subtaskIdx++) {
 
-                               List<StreamStateHandle> nonPartitionableStates 
= new ArrayList<>(chainLength);
                                List<OperatorStateHandle> operatorStatesBackend 
= new ArrayList<>(chainLength);
                                List<OperatorStateHandle> operatorStatesStream 
= new ArrayList<>(chainLength);
 
                                for (int chainIdx = 0; chainIdx < chainLength; 
++chainIdx) {
 
-                                       StreamStateHandle nonPartitionableState 
=
-                                                       new 
TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + 
chainIdx).getBytes(
-                                                               
ConfigConstants.DEFAULT_CHARSET));
                                        StreamStateHandle operatorStateBackend =
                                                        new 
TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + 
chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
                                        StreamStateHandle operatorStateStream =
@@ -193,10 +181,6 @@ public class CheckpointTestUtils {
                                        offsetsMap.put("B", new 
OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
                                        offsetsMap.put("C", new 
OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, 
OperatorStateHandle.Mode.BROADCAST));
 
-                                       if (chainIdx != 
noNonPartitionableStateAtIndex) {
-                                               
nonPartitionableStates.add(nonPartitionableState);
-                                       }
-
                                        if (chainIdx != 
noOperatorStateBackendAtIndex) {
                                                OperatorStateHandle 
operatorStateHandleBackend =
                                                                new 
OperatorStateHandle(offsetsMap, operatorStateBackend);
@@ -222,7 +206,6 @@ public class CheckpointTestUtils {
                                }
 
                                taskState.putState(subtaskIdx, new SubtaskState(
-                                               new 
ChainedStateHandle<>(nonPartitionableStates),
                                                new 
ChainedStateHandle<>(operatorStatesBackend),
                                                new 
ChainedStateHandle<>(operatorStatesStream),
                                                keyedStateStream,

Reply via email to