Repository: flink
Updated Branches:
  refs/heads/master ded7faeab -> 2045cc5f8


[FLINK-5790] [core] Use list types when ListStateDescriptor extends 
StateDescriptor

This closes #3305


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d47446ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d47446ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d47446ca

Branch: refs/heads/master
Commit: d47446cafffe0d34d89488f6eb860aa139ceb3f1
Parents: ded7fae
Author: xiaogang.sxg <xiaogang....@alibaba-inc.com>
Authored: Tue Feb 14 13:39:30 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 12:21:07 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBListState.java       |   4 +-
 .../api/common/state/ListStateDescriptor.java   |  35 +++--
 .../flink/api/common/state/StateDescriptor.java |  18 +--
 .../common/typeutils/base/ListSerializer.java   | 131 +++++++++++++++++++
 .../flink/api/java/typeutils/ListTypeInfo.java  | 116 ++++++++++++++++
 .../util/MigrationInstantiationUtil.java        |   2 +-
 .../common/state/ListStateDescriptorTest.java   |  25 +++-
 .../api/common/state/ListStateDescriptor.java   | 110 ++++++++++++++++
 .../runtime/state/ArrayListSerializer.java      |  10 +-
 .../state/DefaultOperatorStateBackend.java      |   6 +-
 .../state/heap/HeapKeyedStateBackend.java       |  17 +--
 .../flink/runtime/state/heap/HeapListState.java |  25 ++--
 .../runtime/state/StateBackendTestBase.java     |   2 +-
 .../runtime/state/heap/HeapListStateTest.java   |   8 +-
 ...ccumulatingProcessingTimeWindowOperator.java |   8 +-
 .../operators/StateDescriptorPassingTest.java   |  28 +++-
 .../operators/StreamingRuntimeContextTest.java  |  10 +-
 17 files changed, 476 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index e6988f7..a8b20d1 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -47,7 +47,7 @@ import java.util.List;
  * @param <V> The type of the values in the list state.
  */
 public class RocksDBListState<K, N, V>
-       extends AbstractRocksDBState<K, N, ListState<V>, 
ListStateDescriptor<V>, V>
+       extends AbstractRocksDBState<K, N, ListState<V>, 
ListStateDescriptor<V>, List<V>>
        implements InternalListState<N, V> {
 
        /** Serializer for the values */
@@ -72,7 +72,7 @@ public class RocksDBListState<K, N, V>
                        RocksDBKeyedStateBackend<K> backend) {
 
                super(columnFamily, namespaceSerializer, stateDesc, backend);
-               this.valueSerializer = stateDesc.getSerializer();
+               this.valueSerializer = stateDesc.getElementSerializer();
 
                writeOptions = new WriteOptions();
                writeOptions.setDisableWAL(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 6861a07..2047e24 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -21,6 +21,10 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+
+import java.util.List;
 
 /**
  * A {@link StateDescriptor} for {@link ListState}. This can be used to create 
a partitioned
@@ -30,8 +34,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * @param <T> The type of the values that can be added to the list state.
  */
 @PublicEvolving
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
-       private static final long serialVersionUID = 1L;
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, 
List<T>> {
+       private static final long serialVersionUID = 2L;
 
        /**
         * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
@@ -40,20 +44,22 @@ public class ListStateDescriptor<T> extends 
StateDescriptor<ListState<T>, T> {
         * consider using the {@link #ListStateDescriptor(String, 
TypeInformation)} constructor.
         *
         * @param name The (unique) name for the state.
-        * @param typeClass The type of the values in the state.
+        * @param elementTypeClass The type of the elements in the state.
         */
-       public ListStateDescriptor(String name, Class<T> typeClass) {
-               super(name, typeClass, null);
+       @SuppressWarnings("unchecked")
+       public ListStateDescriptor(String name, Class<T> elementTypeClass) {
+               super(name, new ListTypeInfo<>(elementTypeClass), null);
        }
 
        /**
         * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
         *
         * @param name The (unique) name for the state.
-        * @param typeInfo The type of the values in the state.
+        * @param elementTypeInfo The type of the elements in the state.
         */
-       public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
-               super(name, typeInfo, null);
+       @SuppressWarnings("unchecked")
+       public ListStateDescriptor(String name, TypeInformation<T> 
elementTypeInfo) {
+               super(name, new ListTypeInfo<>(elementTypeInfo), null);
        }
 
        /**
@@ -62,10 +68,19 @@ public class ListStateDescriptor<T> extends 
StateDescriptor<ListState<T>, T> {
         * @param name The (unique) name for the state.
         * @param typeSerializer The type serializer for the list values.
         */
+       @SuppressWarnings("unchecked")
        public ListStateDescriptor(String name, TypeSerializer<T> 
typeSerializer) {
-               super(name, typeSerializer, null);
+               super(name, new ListSerializer<>(typeSerializer), null);
+       }
+
+       public TypeSerializer<T> getElementSerializer() {
+               if (!(serializer instanceof ListSerializer)) {
+                       throw new IllegalStateException();
+               }
+
+               return ((ListSerializer<T>)serializer).getElementSerializer();
        }
-       
+
        // 
------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index b901d03..bc909e6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -243,22 +243,6 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
                }
        }
 
-       /**
-        * This method should be called by subclasses prior to serialization. 
Because the TypeInformation is
-        * not always serializable, it is 'transient' and dropped during 
serialization. Hence, the descriptor
-        * needs to make sure that the serializer is created before the 
TypeInformation is dropped. 
-        */
-       private void ensureSerializerCreated() {
-               if (serializer == null) {
-                       if (typeInfo != null) {
-                               serializer = typeInfo.createSerializer(new 
ExecutionConfig());
-                       } else {
-                               throw new IllegalStateException(
-                                               "Cannot initialize serializer 
after TypeInformation was dropped during serialization");
-                       }
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Standard Utils
        // 
------------------------------------------------------------------------
@@ -287,7 +271,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
 
        private void writeObject(final ObjectOutputStream out) throws 
IOException {
                // make sure we have a serializer before the type information 
gets lost
-               ensureSerializerCreated();
+               initializeSerializerUnlessSet(new ExecutionConfig());
 
                // write all the non-transient fields
                out.defaultWriteObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
new file mode 100644
index 0000000..a875a3b
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+@SuppressWarnings("ForLoopReplaceableByForEach")
+public class ListSerializer<T> extends TypeSerializer<List<T>> {
+
+       private static final long serialVersionUID = 1119562170939152304L;
+
+       private final TypeSerializer<T> elementSerializer;
+
+       public ListSerializer(TypeSerializer<T> elementSerializer) {
+               this.elementSerializer = elementSerializer;
+       }
+
+       public TypeSerializer<T> getElementSerializer() {
+               return elementSerializer;
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<List<T>> duplicate() {
+               TypeSerializer<T> duplicateElement = 
elementSerializer.duplicate();
+               return duplicateElement == elementSerializer ? this : new 
ListSerializer<T>(duplicateElement);
+       }
+
+       @Override
+       public List<T> createInstance() {
+               return new ArrayList<>();
+       }
+
+       @Override
+       public List<T> copy(List<T> from) {
+               List<T> newList = new ArrayList<>(from.size());
+               for (int i = 0; i < from.size(); i++) {
+                       newList.add(elementSerializer.copy(from.get(i)));
+               }
+               return newList;
+       }
+
+       @Override
+       public List<T> copy(List<T> from, List<T> reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return -1; // var length
+       }
+
+       @Override
+       public void serialize(List<T> list, DataOutputView target) throws 
IOException {
+               final int size = list.size();
+               target.writeInt(size);
+               for (int i = 0; i < size; i++) {
+                       elementSerializer.serialize(list.get(i), target);
+               }
+       }
+
+       @Override
+       public List<T> deserialize(DataInputView source) throws IOException {
+               final int size = source.readInt();
+               final List<T> list = new ArrayList<>(size);
+               for (int i = 0; i < size; i++) {
+                       list.add(elementSerializer.deserialize(source));
+               }
+               return list;
+       }
+
+       @Override
+       public List<T> deserialize(List<T> reuse, DataInputView source) throws 
IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               // copy number of elements
+               final int num = source.readInt();
+               target.writeInt(num);
+               for (int i = 0; i < num; i++) {
+                       elementSerializer.copy(source, target);
+               }
+       }
+
+       // --------------------------------------------------------------------
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this ||
+                               (obj != null && obj.getClass() == getClass() &&
+                                               
elementSerializer.equals(((ListSerializer<?>) obj).elementSerializer));
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return true;
+       }
+
+       @Override
+       public int hashCode() {
+               return elementSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
new file mode 100644
index 0000000..e70aaf8
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+
+import java.util.List;
+
+/**
+ * A {@link TypeInformation} for the list types of the Java API.
+ *
+ * @param <T> The type of the elements in the list.
+ */
+
+
+@Public
+public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
+
+       private final TypeInformation<T> elementTypeInfo;
+
+       public ListTypeInfo(Class<T> elementTypeClass) {
+               this.elementTypeInfo = 
TypeExtractor.createTypeInfo(elementTypeClass);
+       }
+
+       public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
+               this.elementTypeInfo = elementTypeInfo;
+       }
+
+       public TypeInformation<T> getElementTypeInfo() {
+               return elementTypeInfo;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 0;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return elementTypeInfo.getTotalFields();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public Class<List<T>> getTypeClass() {
+               return (Class<List<T>>)(Class<?>)List.class;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<List<T>> createSerializer(ExecutionConfig config) 
{
+               TypeSerializer<T> elementTypeSerializer = 
elementTypeInfo.createSerializer(config);
+               return new ListSerializer<>(elementTypeSerializer);
+       }
+
+       @Override
+       public String toString() {
+               return null;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ListTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
+
+                       return other.canEqual(this) && 
elementTypeInfo.equals(other.elementTypeInfo);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * elementTypeInfo.hashCode() + 1;
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return (obj instanceof ListTypeInfo);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
 
b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
index ca75cce..d175b2f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
@@ -91,4 +91,4 @@ public final class MigrationInstantiationUtil {
                throw new IllegalAccessError();
        }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index 6dc00f0..b9d9a8c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
@@ -45,13 +46,18 @@ public class ListStateDescriptorTest {
                
                assertEquals("testName", descr.getName());
                assertNotNull(descr.getSerializer());
-               assertEquals(serializer, descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof ListSerializer);
+               assertNotNull(descr.getElementSerializer());
+               assertEquals(serializer, descr.getElementSerializer());
 
                ListStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
 
                assertEquals("testName", copy.getName());
                assertNotNull(copy.getSerializer());
-               assertEquals(serializer, copy.getSerializer());
+               assertTrue(copy.getSerializer() instanceof ListSerializer);
+
+               assertNotNull(copy.getElementSerializer());
+               assertEquals(serializer, copy.getElementSerializer());
        }
 
        @Test
@@ -69,11 +75,14 @@ public class ListStateDescriptorTest {
                } catch (IllegalStateException ignored) {}
 
                descr.initializeSerializerUnlessSet(cfg);
-               
+
                assertNotNull(descr.getSerializer());
-               assertTrue(descr.getSerializer() instanceof KryoSerializer);
+               assertTrue(descr.getSerializer() instanceof ListSerializer);
 
-               assertTrue(((KryoSerializer<?>) 
descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+               assertNotNull(descr.getElementSerializer());
+               assertTrue(descr.getElementSerializer() instanceof 
KryoSerializer);
+
+               assertTrue(((KryoSerializer<?>) 
descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() 
> 0);
        }
 
        @Test
@@ -85,7 +94,11 @@ public class ListStateDescriptorTest {
                ListStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
 
                assertEquals("testName", copy.getName());
+
                assertNotNull(copy.getSerializer());
-               assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+               assertTrue(copy.getSerializer() instanceof ListSerializer);
+
+               assertNotNull(copy.getElementSerializer());
+               assertEquals(StringSerializer.INSTANCE, 
copy.getElementSerializer());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
new file mode 100644
index 0000000..4e83cca
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A {@link StateDescriptor} for {@link ListState}.
+ *
+ * @param <T> The type of the values that can be added to the list state.
+ */
+@PublicEvolving
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
+        *
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #ListStateDescriptor(String, 
TypeInformation)} constructor.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeClass The type of the values in the state.
+        */
+       public ListStateDescriptor(String name, Class<T> typeClass) {
+               super(name, typeClass, null);
+       }
+
+       /**
+        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeInfo The type of the values in the state.
+        */
+       public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
+               super(name, typeInfo, null);
+       }
+
+       /**
+        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeSerializer The type serializer for the list values.
+        */
+       public ListStateDescriptor(String name, TypeSerializer<T> 
typeSerializer) {
+               super(name, typeSerializer, null);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public ListState<T> bind(StateBackend stateBackend) throws Exception {
+               throw new IllegalStateException("Cannot bind states with a 
legacy state descriptor.");
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
+
+               return serializer.equals(that.serializer) && 
name.equals(that.name);
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = serializer.hashCode();
+               result = 31 * result + name.hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "ListStateDescriptor{" +
+                               "serializer=" + serializer +
+                               '}';
+       }
+
+       @Override
+       public org.apache.flink.api.common.state.StateDescriptor.Type getType() 
{
+               return 
org.apache.flink.api.common.state.StateDescriptor.Type.LIST;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 43e6786..f5a6405 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -35,6 +35,10 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
                this.elementSerializer = elementSerializer;
        }
 
+       public TypeSerializer<T> getElementSerializer() {
+               return elementSerializer;
+       }
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -109,8 +113,8 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
        @Override
        public boolean equals(Object obj) {
                return obj == this ||
-                       (obj != null && obj.getClass() == getClass() &&
-                               
elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
+                               (obj != null && obj.getClass() == getClass() &&
+                                               
elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
        }
 
        @Override
@@ -122,4 +126,4 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
        public int hashCode() {
                return elementSerializer.hashCode();
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 1cd1da7..adf0727 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -106,7 +106,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                Preconditions.checkNotNull(stateDescriptor);
 
                String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
-               TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getSerializer());
+               TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 
                @SuppressWarnings("unchecked")
                PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
@@ -126,8 +126,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                        
partitionableListState.getAssignmentMode());
                        Preconditions.checkState(
                                        
partitionableListState.getPartitionStateSerializer().
-                                                       
isCompatibleWith(stateDescriptor.getSerializer()),
-                                       "Incompatible type serializers. 
Provided: " + stateDescriptor.getSerializer() +
+                                                       
isCompatibleWith(stateDescriptor.getElementSerializer()),
+                                       "Incompatible type serializers. 
Provided: " + stateDescriptor.getElementSerializer() +
                                                        ", found: " + 
partitionableListState.getPartitionStateSerializer());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0fe92e7..2366342 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
-
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,6 +27,7 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -60,7 +60,6 @@ import 
org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -152,15 +151,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        TypeSerializer<N> namespaceSerializer,
                        ListStateDescriptor<T> stateDesc) throws Exception {
 
-               String name = stateDesc.getName();
-
-               @SuppressWarnings("unchecked")
-               StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, 
ArrayList<T>>) stateTables.get(name);
 
-               RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
-                               new 
RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, 
namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()));
-
-               stateTable = tryRegisterStateTable(stateTable, newMetaInfo);
+               StateTable<K, N, List<T>> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
                return new HeapListState<>(this, stateDesc, stateTable, 
keySerializer, namespaceSerializer);
        }
 
@@ -449,6 +441,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
                        }
 
+                       // The serializer used in the list states now changes 
from ArrayListSerializer to ListSerializer.
+                       if (stateSerializer instanceof ArrayListSerializer) {
+                               stateSerializer = new 
ListSerializer<>(((ArrayListSerializer<?>) 
stateSerializer).getElementSerializer());
+                       }
+
                        Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
 
                        if (null != nullNameSpaceFix) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f0eb53e..a4e8ea7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -40,7 +41,7 @@ import java.util.Map;
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-               extends AbstractHeapMergingState<K, N, V, Iterable<V>, 
ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+               extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, 
ListState<V>, ListStateDescriptor<V>>
                implements InternalListState<N, V> {
 
        /**
@@ -54,7 +55,7 @@ public class HeapListState<K, N, V>
        public HeapListState(
                        KeyedStateBackend<K> backend,
                        ListStateDescriptor<V> stateDesc,
-                       StateTable<K, N, ArrayList<V>> stateTable,
+                       StateTable<K, N, List<V>> stateTable,
                        TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer) {
                super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
@@ -69,14 +70,14 @@ public class HeapListState<K, N, V>
                Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
                Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
 
-               Map<N, Map<K, ArrayList<V>>> namespaceMap =
+               Map<N, Map<K, List<V>>> namespaceMap =
                                
stateTable.get(backend.getCurrentKeyGroupIndex());
 
                if (namespaceMap == null) {
                        return null;
                }
 
-               Map<K, ArrayList<V>> keyedMap = 
namespaceMap.get(currentNamespace);
+               Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
 
                if (keyedMap == null) {
                        return null;
@@ -95,7 +96,7 @@ public class HeapListState<K, N, V>
                        return;
                }
 
-               Map<N, Map<K, ArrayList<V>>> namespaceMap =
+               Map<N, Map<K, List<V>>> namespaceMap =
                                
stateTable.get(backend.getCurrentKeyGroupIndex());
 
                if (namespaceMap == null) {
@@ -103,14 +104,14 @@ public class HeapListState<K, N, V>
                        stateTable.set(backend.getCurrentKeyGroupIndex(), 
namespaceMap);
                }
 
-               Map<K, ArrayList<V>> keyedMap = 
namespaceMap.get(currentNamespace);
+               Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
 
                if (keyedMap == null) {
                        keyedMap = createNewMap();
                        namespaceMap.put(currentNamespace, keyedMap);
                }
 
-               ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+               List<V> list = keyedMap.get(backend.<K>getCurrentKey());
 
                if (list == null) {
                        list = new ArrayList<>();
@@ -124,26 +125,26 @@ public class HeapListState<K, N, V>
                Preconditions.checkState(namespace != null, "No namespace 
given.");
                Preconditions.checkState(key != null, "No key given.");
 
-               Map<N, Map<K, ArrayList<V>>> namespaceMap =
+               Map<N, Map<K, List<V>>> namespaceMap =
                                
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, 
backend.getNumberOfKeyGroups()));
 
                if (namespaceMap == null) {
                        return null;
                }
 
-               Map<K, ArrayList<V>> keyedMap = 
namespaceMap.get(currentNamespace);
+               Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
 
                if (keyedMap == null) {
                        return null;
                }
 
-               ArrayList<V> result = keyedMap.get(key);
+               List<V> result = keyedMap.get(key);
 
                if (result == null) {
                        return null;
                }
 
-               TypeSerializer<V> serializer = stateDesc.getSerializer();
+               TypeSerializer<V> serializer = stateDesc.getElementSerializer();
 
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(baos);
@@ -165,7 +166,7 @@ public class HeapListState<K, N, V>
        // 
------------------------------------------------------------------------
 
        @Override
-       protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+       protected List<V> mergeState(List<V> a, List<V> b) {
                a.addAll(b);
                return a;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b4bf664..7737ecf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
                        TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
-                       TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
+                       TypeSerializer<String> valueSerializer = 
kvId.getElementSerializer();
 
                        ListState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 33d60a0..f1b071a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.ExecutionConfig;
-
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,11 +28,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
-
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
@@ -96,7 +94,7 @@ public class HeapListStateTest {
 
                        // make sure all lists / maps are cleared
 
-                       StateTable<String, VoidNamespace, ArrayList<Long>> 
stateTable =
+                       StateTable<String, VoidNamespace, List<Long>> 
stateTable =
                                        ((HeapListState<String, VoidNamespace, 
Long>) state).stateTable;
 
                        assertTrue(stateTable.isEmpty());
@@ -216,7 +214,7 @@ public class HeapListStateTest {
                        state.setCurrentNamespace(namespace1);
                        state.clear();
 
-                       StateTable<String, Integer, ArrayList<Long>> stateTable 
= 
+                       StateTable<String, Integer, List<Long>> stateTable =
                                        ((HeapListState<String, Integer, Long>) 
state).stateTable;
 
                        assertTrue(stateTable.isEmpty());

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 90e4b52..7adaf13 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -31,12 +31,12 @@ import java.util.ArrayList;
 
 @Internal
 @Deprecated
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
                extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
        private static final long serialVersionUID = 7305948082830843475L;
 
-       
+
        public AccumulatingProcessingTimeWindowOperator(
                        WindowFunction<IN, OUT, KEY, TimeWindow> function,
                        KeySelector<IN, KEY> keySelector,
@@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
        protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
                @SuppressWarnings("unchecked")
                WindowFunction<IN, OUT, KEY, Window> windowFunction = 
(WindowFunction<IN, OUT, KEY, Window>) function;
-               
+
                return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index c0ca6a0..26cb7ac 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -23,8 +23,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -130,7 +132,7 @@ public class StateDescriptorPassingTest {
                                                                                
Iterable<File> input, Collector<String> out) {}
                                });
 
-               validateStateDescriptorConfigured(result);
+               validateListStateDescriptorConfigured(result);
        }
 
        @Test
@@ -190,7 +192,7 @@ public class StateDescriptorPassingTest {
                                        public void apply(TimeWindow window, 
Iterable<File> input, Collector<String> out) {}
                                });
 
-               validateStateDescriptorConfigured(result);
+               validateListStateDescriptorConfigured(result);
        }
 
        // 
------------------------------------------------------------------------
@@ -211,4 +213,26 @@ public class StateDescriptorPassingTest {
                assertTrue("serializer registration was not properly passed 
on", 
                                kryo.getSerializer(File.class) instanceof 
JavaSerializer);
        }
+
+       private void 
validateListStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+               OneInputTransformation<?, ?> transform = 
(OneInputTransformation<?, ?>) result.getTransformation();
+               WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, 
?>) transform.getOperator();
+               StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+               assertTrue(descr instanceof ListStateDescriptor);
+
+               ListStateDescriptor<?> listDescr = 
(ListStateDescriptor<?>)descr;
+
+               // this would be the first statement to fail if state 
descriptors were not properly initialized
+               TypeSerializer<?> serializer = listDescr.getSerializer();
+               assertTrue(serializer instanceof ListSerializer);
+
+               TypeSerializer<?> elementSerializer = 
listDescr.getElementSerializer();
+               assertTrue(elementSerializer instanceof KryoSerializer);
+
+               Kryo kryo = ((KryoSerializer<?>) elementSerializer).getKryo();
+
+               assertTrue("serializer registration was not properly passed on",
+                               kryo.getSerializer(File.class) instanceof 
JavaSerializer);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 2791726..294b8da 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
@@ -162,12 +163,15 @@ public class StreamingRuntimeContextTest {
                ListStateDescriptor<TaskInfo> descr = new 
ListStateDescriptor<>("name", TaskInfo.class);
                context.getListState(descr);
 
-               StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, 
?>) descriptorCapture.get();
+               ListStateDescriptor<?> descrIntercepted = 
(ListStateDescriptor<?>) descriptorCapture.get();
                TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
                // check that the Path class is really registered, i.e., the 
execution config was applied
-               assertTrue(serializer instanceof KryoSerializer);
-               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
+               assertTrue(serializer instanceof ListSerializer);
+
+               TypeSerializer<?> elementSerializer = 
descrIntercepted.getElementSerializer();
+               assertTrue(elementSerializer instanceof KryoSerializer);
+               assertTrue(((KryoSerializer<?>) 
elementSerializer).getKryo().getRegistration(Path.class).getId() > 0);
        }
 
        @Test

Reply via email to