Repository: flink Updated Branches: refs/heads/master ded7faeab -> 2045cc5f8
[FLINK-5790] [core] Use list types when ListStateDescriptor extends StateDescriptor This closes #3305 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d47446ca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d47446ca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d47446ca Branch: refs/heads/master Commit: d47446cafffe0d34d89488f6eb860aa139ceb3f1 Parents: ded7fae Author: xiaogang.sxg <xiaogang....@alibaba-inc.com> Authored: Tue Feb 14 13:39:30 2017 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 15 12:21:07 2017 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBListState.java | 4 +- .../api/common/state/ListStateDescriptor.java | 35 +++-- .../flink/api/common/state/StateDescriptor.java | 18 +-- .../common/typeutils/base/ListSerializer.java | 131 +++++++++++++++++++ .../flink/api/java/typeutils/ListTypeInfo.java | 116 ++++++++++++++++ .../util/MigrationInstantiationUtil.java | 2 +- .../common/state/ListStateDescriptorTest.java | 25 +++- .../api/common/state/ListStateDescriptor.java | 110 ++++++++++++++++ .../runtime/state/ArrayListSerializer.java | 10 +- .../state/DefaultOperatorStateBackend.java | 6 +- .../state/heap/HeapKeyedStateBackend.java | 17 +-- .../flink/runtime/state/heap/HeapListState.java | 25 ++-- .../runtime/state/StateBackendTestBase.java | 2 +- .../runtime/state/heap/HeapListStateTest.java | 8 +- ...ccumulatingProcessingTimeWindowOperator.java | 8 +- .../operators/StateDescriptorPassingTest.java | 28 +++- .../operators/StreamingRuntimeContextTest.java | 10 +- 17 files changed, 476 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index e6988f7..a8b20d1 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -47,7 +47,7 @@ import java.util.List; * @param <V> The type of the values in the list state. */ public class RocksDBListState<K, N, V> - extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, V> + extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>> implements InternalListState<N, V> { /** Serializer for the values */ @@ -72,7 +72,7 @@ public class RocksDBListState<K, N, V> RocksDBKeyedStateBackend<K> backend) { super(columnFamily, namespaceSerializer, stateDesc, backend); - this.valueSerializer = stateDesc.getSerializer(); + this.valueSerializer = stateDesc.getElementSerializer(); writeOptions = new WriteOptions(); writeOptions.setDisableWAL(true); http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java index 6861a07..2047e24 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java @@ -21,6 +21,10 @@ package org.apache.flink.api.common.state; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.java.typeutils.ListTypeInfo; + +import java.util.List; /** * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned @@ -30,8 +34,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; * @param <T> The type of the values that can be added to the list state. */ @PublicEvolving -public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> { - private static final long serialVersionUID = 1L; +public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> { + private static final long serialVersionUID = 2L; /** * Creates a new {@code ListStateDescriptor} with the given name and list element type. @@ -40,20 +44,22 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> { * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. * * @param name The (unique) name for the state. - * @param typeClass The type of the values in the state. + * @param elementTypeClass The type of the elements in the state. */ - public ListStateDescriptor(String name, Class<T> typeClass) { - super(name, typeClass, null); + @SuppressWarnings("unchecked") + public ListStateDescriptor(String name, Class<T> elementTypeClass) { + super(name, new ListTypeInfo<>(elementTypeClass), null); } /** * Creates a new {@code ListStateDescriptor} with the given name and list element type. * * @param name The (unique) name for the state. - * @param typeInfo The type of the values in the state. + * @param elementTypeInfo The type of the elements in the state. */ - public ListStateDescriptor(String name, TypeInformation<T> typeInfo) { - super(name, typeInfo, null); + @SuppressWarnings("unchecked") + public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) { + super(name, new ListTypeInfo<>(elementTypeInfo), null); } /** @@ -62,10 +68,19 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> { * @param name The (unique) name for the state. * @param typeSerializer The type serializer for the list values. */ + @SuppressWarnings("unchecked") public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) { - super(name, typeSerializer, null); + super(name, new ListSerializer<>(typeSerializer), null); + } + + public TypeSerializer<T> getElementSerializer() { + if (!(serializer instanceof ListSerializer)) { + throw new IllegalStateException(); + } + + return ((ListSerializer<T>)serializer).getElementSerializer(); } - + // ------------------------------------------------------------------------ @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index b901d03..bc909e6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -243,22 +243,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl } } - /** - * This method should be called by subclasses prior to serialization. Because the TypeInformation is - * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor - * needs to make sure that the serializer is created before the TypeInformation is dropped. - */ - private void ensureSerializerCreated() { - if (serializer == null) { - if (typeInfo != null) { - serializer = typeInfo.createSerializer(new ExecutionConfig()); - } else { - throw new IllegalStateException( - "Cannot initialize serializer after TypeInformation was dropped during serialization"); - } - } - } - // ------------------------------------------------------------------------ // Standard Utils // ------------------------------------------------------------------------ @@ -287,7 +271,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl private void writeObject(final ObjectOutputStream out) throws IOException { // make sure we have a serializer before the type information gets lost - ensureSerializerCreated(); + initializeSerializerUnlessSet(new ExecutionConfig()); // write all the non-transient fields out.defaultWriteObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java new file mode 100644 index 0000000..a875a3b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +@SuppressWarnings("ForLoopReplaceableByForEach") +public class ListSerializer<T> extends TypeSerializer<List<T>> { + + private static final long serialVersionUID = 1119562170939152304L; + + private final TypeSerializer<T> elementSerializer; + + public ListSerializer(TypeSerializer<T> elementSerializer) { + this.elementSerializer = elementSerializer; + } + + public TypeSerializer<T> getElementSerializer() { + return elementSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<List<T>> duplicate() { + TypeSerializer<T> duplicateElement = elementSerializer.duplicate(); + return duplicateElement == elementSerializer ? this : new ListSerializer<T>(duplicateElement); + } + + @Override + public List<T> createInstance() { + return new ArrayList<>(); + } + + @Override + public List<T> copy(List<T> from) { + List<T> newList = new ArrayList<>(from.size()); + for (int i = 0; i < from.size(); i++) { + newList.add(elementSerializer.copy(from.get(i))); + } + return newList; + } + + @Override + public List<T> copy(List<T> from, List<T> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // var length + } + + @Override + public void serialize(List<T> list, DataOutputView target) throws IOException { + final int size = list.size(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elementSerializer.serialize(list.get(i), target); + } + } + + @Override + public List<T> deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + final List<T> list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add(elementSerializer.deserialize(source)); + } + return list; + } + + @Override + public List<T> deserialize(List<T> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + // copy number of elements + final int num = source.readInt(); + target.writeInt(num); + for (int i = 0; i < num; i++) { + elementSerializer.copy(source, target); + } + } + + // -------------------------------------------------------------------- + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + elementSerializer.equals(((ListSerializer<?>) obj).elementSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return elementSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java new file mode 100644 index 0000000..e70aaf8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; + +import java.util.List; + +/** + * A {@link TypeInformation} for the list types of the Java API. + * + * @param <T> The type of the elements in the list. + */ + + +@Public +public final class ListTypeInfo<T> extends TypeInformation<List<T>> { + + private final TypeInformation<T> elementTypeInfo; + + public ListTypeInfo(Class<T> elementTypeClass) { + this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass); + } + + public ListTypeInfo(TypeInformation<T> elementTypeInfo) { + this.elementTypeInfo = elementTypeInfo; + } + + public TypeInformation<T> getElementTypeInfo() { + return elementTypeInfo; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return elementTypeInfo.getTotalFields(); + } + + @SuppressWarnings("unchecked") + @Override + public Class<List<T>> getTypeClass() { + return (Class<List<T>>)(Class<?>)List.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<List<T>> createSerializer(ExecutionConfig config) { + TypeSerializer<T> elementTypeSerializer = elementTypeInfo.createSerializer(config); + return new ListSerializer<>(elementTypeSerializer); + } + + @Override + public String toString() { + return null; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ListTypeInfo) { + @SuppressWarnings("unchecked") + ListTypeInfo<T> other = (ListTypeInfo<T>) obj; + + return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * elementTypeInfo.hashCode() + 1; + } + + @Override + public boolean canEqual(Object obj) { + return (obj instanceof ListTypeInfo); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java index ca75cce..d175b2f 100644 --- a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java @@ -91,4 +91,4 @@ public final class MigrationInstantiationUtil { throw new IllegalAccessError(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java index 6dc00f0..b9d9a8c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.Path; @@ -45,13 +46,18 @@ public class ListStateDescriptorTest { assertEquals("testName", descr.getName()); assertNotNull(descr.getSerializer()); - assertEquals(serializer, descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof ListSerializer); + assertNotNull(descr.getElementSerializer()); + assertEquals(serializer, descr.getElementSerializer()); ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); assertEquals("testName", copy.getName()); assertNotNull(copy.getSerializer()); - assertEquals(serializer, copy.getSerializer()); + assertTrue(copy.getSerializer() instanceof ListSerializer); + + assertNotNull(copy.getElementSerializer()); + assertEquals(serializer, copy.getElementSerializer()); } @Test @@ -69,11 +75,14 @@ public class ListStateDescriptorTest { } catch (IllegalStateException ignored) {} descr.initializeSerializerUnlessSet(cfg); - + assertNotNull(descr.getSerializer()); - assertTrue(descr.getSerializer() instanceof KryoSerializer); + assertTrue(descr.getSerializer() instanceof ListSerializer); - assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); + assertNotNull(descr.getElementSerializer()); + assertTrue(descr.getElementSerializer() instanceof KryoSerializer); + + assertTrue(((KryoSerializer<?>) descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); } @Test @@ -85,7 +94,11 @@ public class ListStateDescriptorTest { ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); assertEquals("testName", copy.getName()); + assertNotNull(copy.getSerializer()); - assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); + assertTrue(copy.getSerializer() instanceof ListSerializer); + + assertNotNull(copy.getElementSerializer()); + assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java new file mode 100644 index 0000000..4e83cca --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.StateBackend; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * A {@link StateDescriptor} for {@link ListState}. + * + * @param <T> The type of the values that can be added to the list state. + */ +@PublicEvolving +public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> { + private static final long serialVersionUID = 1L; + + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param typeClass The type of the values in the state. + */ + public ListStateDescriptor(String name, Class<T> typeClass) { + super(name, typeClass, null); + } + + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * @param name The (unique) name for the state. + * @param typeInfo The type of the values in the state. + */ + public ListStateDescriptor(String name, TypeInformation<T> typeInfo) { + super(name, typeInfo, null); + } + + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * @param name The (unique) name for the state. + * @param typeSerializer The type serializer for the list values. + */ + public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) { + super(name, typeSerializer, null); + } + + // ------------------------------------------------------------------------ + + @Override + public ListState<T> bind(StateBackend stateBackend) throws Exception { + throw new IllegalStateException("Cannot bind states with a legacy state descriptor."); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ListStateDescriptor<?> that = (ListStateDescriptor<?>) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ListStateDescriptor{" + + "serializer=" + serializer + + '}'; + } + + @Override + public org.apache.flink.api.common.state.StateDescriptor.Type getType() { + return org.apache.flink.api.common.state.StateDescriptor.Type.LIST; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java index 43e6786..f5a6405 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -35,6 +35,10 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { this.elementSerializer = elementSerializer; } + public TypeSerializer<T> getElementSerializer() { + return elementSerializer; + } + @Override public boolean isImmutableType() { return false; @@ -109,8 +113,8 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { @Override public boolean equals(Object obj) { return obj == this || - (obj != null && obj.getClass() == getClass() && - elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer)); + (obj != null && obj.getClass() == getClass() && + elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer)); } @Override @@ -122,4 +126,4 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { public int hashCode() { return elementSerializer.hashCode(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 1cd1da7..adf0727 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -106,7 +106,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { Preconditions.checkNotNull(stateDescriptor); String name = Preconditions.checkNotNull(stateDescriptor.getName()); - TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getSerializer()); + TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); @SuppressWarnings("unchecked") PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); @@ -126,8 +126,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { partitionableListState.getAssignmentMode()); Preconditions.checkState( partitionableListState.getPartitionStateSerializer(). - isCompatibleWith(stateDescriptor.getSerializer()), - "Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() + + isCompatibleWith(stateDescriptor.getElementSerializer()), + "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() + ", found: " + partitionableListState.getPartitionStateSerializer()); } http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 0fe92e7..2366342 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap; import org.apache.commons.io.IOUtils; import org.apache.flink.annotation.VisibleForTesting; - import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -28,6 +27,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; @@ -60,7 +60,6 @@ import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,15 +151,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception { - String name = stateDesc.getName(); - - @SuppressWarnings("unchecked") - StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name); - RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo = - new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer())); - - stateTable = tryRegisterStateTable(stateTable, newMetaInfo); + StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); } @@ -449,6 +441,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { namespaceSerializer = VoidNamespaceSerializer.INSTANCE; } + // The serializer used in the list states now changes from ArrayListSerializer to ListSerializer. + if (stateSerializer instanceof ArrayListSerializer) { + stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer()); + } + Map nullNameSpaceFix = (Map) rawResultMap.remove(null); if (null != nullNameSpaceFix) { http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index f0eb53e..a4e8ea7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions; import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -40,7 +41,7 @@ import java.util.Map; * @param <V> The type of the value. */ public class HeapListState<K, N, V> - extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> + extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>> implements InternalListState<N, V> { /** @@ -54,7 +55,7 @@ public class HeapListState<K, N, V> public HeapListState( KeyedStateBackend<K> backend, ListStateDescriptor<V> stateDesc, - StateTable<K, N, ArrayList<V>> stateTable, + StateTable<K, N, List<V>> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) { super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); @@ -69,14 +70,14 @@ public class HeapListState<K, N, V> Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - Map<N, Map<K, ArrayList<V>>> namespaceMap = + Map<N, Map<K, List<V>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); if (namespaceMap == null) { return null; } - Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace); + Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace); if (keyedMap == null) { return null; @@ -95,7 +96,7 @@ public class HeapListState<K, N, V> return; } - Map<N, Map<K, ArrayList<V>>> namespaceMap = + Map<N, Map<K, List<V>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); if (namespaceMap == null) { @@ -103,14 +104,14 @@ public class HeapListState<K, N, V> stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); } - Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace); + Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace); if (keyedMap == null) { keyedMap = createNewMap(); namespaceMap.put(currentNamespace, keyedMap); } - ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey()); + List<V> list = keyedMap.get(backend.<K>getCurrentKey()); if (list == null) { list = new ArrayList<>(); @@ -124,26 +125,26 @@ public class HeapListState<K, N, V> Preconditions.checkState(namespace != null, "No namespace given."); Preconditions.checkState(key != null, "No key given."); - Map<N, Map<K, ArrayList<V>>> namespaceMap = + Map<N, Map<K, List<V>>> namespaceMap = stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups())); if (namespaceMap == null) { return null; } - Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace); + Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace); if (keyedMap == null) { return null; } - ArrayList<V> result = keyedMap.get(key); + List<V> result = keyedMap.get(key); if (result == null) { return null; } - TypeSerializer<V> serializer = stateDesc.getSerializer(); + TypeSerializer<V> serializer = stateDesc.getElementSerializer(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); @@ -165,7 +166,7 @@ public class HeapListState<K, N, V> // ------------------------------------------------------------------------ @Override - protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) { + protected List<V> mergeState(List<V> a, List<V> b) { a.addAll(b); return a; } http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index b4bf664..7737ecf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE; - TypeSerializer<String> valueSerializer = kvId.getSerializer(); + TypeSerializer<String> valueSerializer = kvId.getElementSerializer(); ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java index 33d60a0..f1b071a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.ExecutionConfig; - import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -29,11 +28,10 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.internal.InternalListState; - import org.junit.Test; -import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import static java.util.Arrays.asList; @@ -96,7 +94,7 @@ public class HeapListStateTest { // make sure all lists / maps are cleared - StateTable<String, VoidNamespace, ArrayList<Long>> stateTable = + StateTable<String, VoidNamespace, List<Long>> stateTable = ((HeapListState<String, VoidNamespace, Long>) state).stateTable; assertTrue(stateTable.isEmpty()); @@ -216,7 +214,7 @@ public class HeapListStateTest { state.setCurrentNamespace(namespace1); state.clear(); - StateTable<String, Integer, ArrayList<Long>> stateTable = + StateTable<String, Integer, List<Long>> stateTable = ((HeapListState<String, Integer, Long>) state).stateTable; assertTrue(stateTable.isEmpty()); http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java index 90e4b52..7adaf13 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -31,12 +31,12 @@ import java.util.ArrayList; @Internal @Deprecated -public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> +public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> { private static final long serialVersionUID = 7305948082830843475L; - + public AccumulatingProcessingTimeWindowOperator( WindowFunction<IN, OUT, KEY, TimeWindow> function, KeySelector<IN, KEY> keySelector, @@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { @SuppressWarnings("unchecked") WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function; - + return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java index c0ca6a0..26cb7ac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java @@ -23,8 +23,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -130,7 +132,7 @@ public class StateDescriptorPassingTest { Iterable<File> input, Collector<String> out) {} }); - validateStateDescriptorConfigured(result); + validateListStateDescriptorConfigured(result); } @Test @@ -190,7 +192,7 @@ public class StateDescriptorPassingTest { public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {} }); - validateStateDescriptorConfigured(result); + validateListStateDescriptorConfigured(result); } // ------------------------------------------------------------------------ @@ -211,4 +213,26 @@ public class StateDescriptorPassingTest { assertTrue("serializer registration was not properly passed on", kryo.getSerializer(File.class) instanceof JavaSerializer); } + + private void validateListStateDescriptorConfigured(SingleOutputStreamOperator<?> result) { + OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation(); + WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator(); + StateDescriptor<?, ?> descr = op.getStateDescriptor(); + + assertTrue(descr instanceof ListStateDescriptor); + + ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>)descr; + + // this would be the first statement to fail if state descriptors were not properly initialized + TypeSerializer<?> serializer = listDescr.getSerializer(); + assertTrue(serializer instanceof ListSerializer); + + TypeSerializer<?> elementSerializer = listDescr.getElementSerializer(); + assertTrue(elementSerializer instanceof KryoSerializer); + + Kryo kryo = ((KryoSerializer<?>) elementSerializer).getKryo(); + + assertTrue("serializer registration was not properly passed on", + kryo.getSerializer(File.class) instanceof JavaSerializer); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 2791726..294b8da 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.Path; @@ -162,12 +163,15 @@ public class StreamingRuntimeContextTest { ListStateDescriptor<TaskInfo> descr = new ListStateDescriptor<>("name", TaskInfo.class); context.getListState(descr); - StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get(); + ListStateDescriptor<?> descrIntercepted = (ListStateDescriptor<?>) descriptorCapture.get(); TypeSerializer<?> serializer = descrIntercepted.getSerializer(); // check that the Path class is really registered, i.e., the execution config was applied - assertTrue(serializer instanceof KryoSerializer); - assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); + assertTrue(serializer instanceof ListSerializer); + + TypeSerializer<?> elementSerializer = descrIntercepted.getElementSerializer(); + assertTrue(elementSerializer instanceof KryoSerializer); + assertTrue(((KryoSerializer<?>) elementSerializer).getKryo().getRegistration(Path.class).getId() > 0); } @Test