[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5885 ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184846476 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace( RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } + + protected V getDefaultValue() { --- End diff -- Since this PR is blocking a bug for the 1.5 release, I'll proceed to merge this as it is. @bowenli86 Perhaps we can open a separate JIRA for this to keep this in mind? ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184841123 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace( RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } + + protected V getDefaultValue() { --- End diff -- Not too sure about this one. That would require introducing 2 methods in the `InternalKvState`: 1. A getter method that returns the default value. 2. A default method that actually does the serialization copying of the default value (the current `getDefaultValue` method). ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184840215 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java --- @@ -455,4 +455,5 @@ public StreamCompressionDecorator getKeyGroupCompressionDecorator() { @VisibleForTesting public abstract int numStateEntries(); + --- End diff -- Will revert ð ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184840230 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( // /** -* Creates a column family handle for use with a k/v state. When restoring from a snapshot -* we don't restore the individual k/v states, just the global RocksDB database and the -* list of column families. When a k/v state is first requested we check here whether we -* already have a column family for that and return it or create a new one if it doesn't exist. +* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * -* This also checks whether the {@link StateDescriptor} for a state matches the one -* that we checkpointed, i.e. is already in the map of column families. +* When restoring from a snapshot, we donât restore the individual k/v states, just the global RocksDB database and +* the list of k/v state information. When a k/v state is first requested we check here whether we +* already have a registered entry for that and return it (after some necessary state compatibility checks) +* or create a new one if it does not exist. */ - @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { + private Tuple2 tryRegisterKvStateInformation( --- End diff -- Will change this as suggested! ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184830451 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java --- @@ -455,4 +455,5 @@ public StreamCompressionDecorator getKeyGroupCompressionDecorator() { @VisibleForTesting public abstract int numStateEntries(); + --- End diff -- revert this? ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184831125 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace( RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } + + protected V getDefaultValue() { --- End diff -- this method is duplicated among some impl classes. We can move it to `InternalKvState` as a [default method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html). ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184696134 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( // /** -* Creates a column family handle for use with a k/v state. When restoring from a snapshot -* we don't restore the individual k/v states, just the global RocksDB database and the -* list of column families. When a k/v state is first requested we check here whether we -* already have a column family for that and return it or create a new one if it doesn't exist. +* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * -* This also checks whether the {@link StateDescriptor} for a state matches the one -* that we checkpointed, i.e. is already in the map of column families. +* When restoring from a snapshot, we donât restore the individual k/v states, just the global RocksDB database and +* the list of k/v state information. When a k/v state is first requested we check here whether we +* already have a registered entry for that and return it (after some necessary state compatibility checks) +* or create a new one if it does not exist. */ - @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { + private Tuple2 tryRegisterKvStateInformation( --- End diff -- This method is rarely invoked, the tuples are not immutable so giving a defensive copy can also have its benefits an I feel like this is cleaner than having casts all over the place. There might also be different ways to solve the general problem, but this feels better to me than the initial version. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184694977 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( // /** -* Creates a column family handle for use with a k/v state. When restoring from a snapshot -* we don't restore the individual k/v states, just the global RocksDB database and the -* list of column families. When a k/v state is first requested we check here whether we -* already have a column family for that and return it or create a new one if it doesn't exist. +* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * -* This also checks whether the {@link StateDescriptor} for a state matches the one -* that we checkpointed, i.e. is already in the map of column families. +* When restoring from a snapshot, we donât restore the individual k/v states, just the global RocksDB database and +* the list of k/v state information. When a k/v state is first requested we check here whether we +* already have a registered entry for that and return it (after some necessary state compatibility checks) +* or create a new one if it does not exist. */ - @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { + private Tuple2 tryRegisterKvStateInformation( --- End diff -- I did not go with this approach because of the extra tuples introduced. Though, TBH, I wasn't sure which was the better approach, this or the one you pointed out. I would not be against this version. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184691691 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( // /** -* Creates a column family handle for use with a k/v state. When restoring from a snapshot -* we don't restore the individual k/v states, just the global RocksDB database and the -* list of column families. When a k/v state is first requested we check here whether we -* already have a column family for that and return it or create a new one if it doesn't exist. +* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * -* This also checks whether the {@link StateDescriptor} for a state matches the one -* that we checkpointed, i.e. is already in the map of column families. +* When restoring from a snapshot, we donât restore the individual k/v states, just the global RocksDB database and +* the list of k/v state information. When a k/v state is first requested we check here whether we +* already have a registered entry for that and return it (after some necessary state compatibility checks) +* or create a new one if it does not exist. */ - @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { + private Tuple2 tryRegisterKvStateInformation( --- End diff -- And we can remove also the lines that suppress warnings ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184690567 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( // /** -* Creates a column family handle for use with a k/v state. When restoring from a snapshot -* we don't restore the individual k/v states, just the global RocksDB database and the -* list of column families. When a k/v state is first requested we check here whether we -* already have a column family for that and return it or create a new one if it doesn't exist. +* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * -* This also checks whether the {@link StateDescriptor} for a state matches the one -* that we checkpointed, i.e. is already in the map of column families. +* When restoring from a snapshot, we donât restore the individual k/v states, just the global RocksDB database and +* the list of k/v state information. When a k/v state is first requested we check here whether we +* already have a registered entry for that and return it (after some necessary state compatibility checks) +* or create a new one if it does not exist. */ - @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { + private Tuple2 tryRegisterKvStateInformation( --- End diff -- We could rewrite this as ``` private Tuple2 > tryRegisterKvStateInformation( StateDescriptor stateDesc, TypeSerializer namespaceSerializer) throws StateMigrationException, IOException { Tuple2 registeredInfo = this.kvStateInformation.get(stateDesc.getName()); if (registeredInfo != null) { @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); Preconditions.checkState( restoredMetaInfoSnapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + " but its corresponding restored snapshot cannot be found."); RegisteredKeyedBackendStateMetaInfo resolveKvStateCompatibility = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility( restoredMetaInfoSnapshot, namespaceSerializer, stateDesc); registeredInfo.f1 = resolveKvStateCompatibility; return Tuple2.of(registeredInfo.f0, resolveKvStateCompatibility); } else { String stateName = stateDesc.getName(); RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( stateDesc.getType(), stateName, namespaceSerializer, stateDesc.getSerializer()); ColumnFamilyHandle columnFamily = createColumnFamily(stateName); registeredInfo = Tuple2.of(columnFamily, newMetaInfo); this.kvStateInformation.put(stateDesc.getName(), registeredInfo); return Tuple2.of(columnFamily, newMetaInfo); } } ``` and get rid of all the individual casts. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184675164 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( // /** -* Creates a column family handle for use with a k/v state. When restoring from a snapshot -* we don't restore the individual k/v states, just the global RocksDB database and the -* list of column families. When a k/v state is first requested we check here whether we -* already have a column family for that and return it or create a new one if it doesn't exist. +* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * -* This also checks whether the {@link StateDescriptor} for a state matches the one -* that we checkpointed, i.e. is already in the map of column families. +* When restoring from a snapshot, we donât restore the individual k/v states, just the global RocksDB database and +* the list of k/v state information. When a k/v state is first requested we check here whether we +* already have a registered entry for that and return it (after some necessary state compatibility checks) +* or create a new one if it does not exist. */ - @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { + private Tuple2 tryRegisterKvStateInformation( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer) throws StateMigrationException, IOException { Tuple2 stateInfo = - kvStateInformation.get(descriptor.getName()); - - RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); + kvStateInformation.get(stateDesc.getName()); + RegisteredKeyedBackendStateMetaInfo newMetaInfo; if (stateInfo != null) { - // TODO with eager registration in place, these checks should be moved to restore() - RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfo = - (RegisteredKeyedBackendStateMetaInfo.Snapshot ) restoredKvStateMetaInfos.get(descriptor.getName()); + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = + restoredKvStateMetaInfos.get(stateDesc.getName()); Preconditions.checkState( - Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), - "Incompatible state names. " + - "Was [" + restoredMetaInfo.getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); - - if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) - && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { - - Preconditions.checkState( - newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), - "Incompatible state types. " + - "Was [" + restoredMetaInfo.getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); - } + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); - // check compatibility results to determine if state migration is required - CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( -
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184657676 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -61,8 +64,7 @@ /** The column family of this particular instance of state. */ protected ColumnFamilyHandle columnFamily; - /** State descriptor from which to create this state instance. */ - protected final SD stateDesc; --- End diff -- Fixed, thanks for catching this. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184657584 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -185,36 +188,44 @@ public HeapKeyedStateBackend( stateName.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. " + "Was [" + stateTable.getMetaInfo().getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + stateName + "]."); - if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + if (!stateType.equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + stateType.equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. " + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + stateType + "]."); } @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.SnapshotrestoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot ) restoredKvStateMetaInfos.get(stateName); // check compatibility results to determine if state migration is required + TypeSerializer newNamespaceSerializer = namespaceSerializer.duplicate(); CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + newNamespaceSerializer); + TypeSerializer newValueSerializer = valueSerializer.duplicate(); --- End diff -- I've updated this according to your suggestion: The reconfiguration method now gets a state descriptor as an argument to reduce confusion. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184657398 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -49,17 +49,18 @@ /** * Creates a new key/value state for the given hash map of key/value pairs. * -* @param stateDesc The state identifier for the state. This contains name -* and can create a default state value. +* @param valueSerializer The serializer for the state. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ public HeapFoldingState( - FoldingStateDescriptorstateDesc, StateTable stateTable, TypeSerializer keySerializer, - TypeSerializer namespaceSerializer) { - super(stateDesc, stateTable, keySerializer, namespaceSerializer); - this.foldTransformation = new FoldTransformation<>(stateDesc); + TypeSerializer valueSerializer, + TypeSerializer namespaceSerializer, + ACC defaultValue, --- End diff -- Javadocs for all state classes are now updated. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184657419 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -103,17 +104,17 @@ public void add(T value) throws IOException { private static final class FoldTransformationimplements StateTransformationFunction { - private final FoldingStateDescriptor stateDescriptor; + private final HeapFoldingState stateRef; private final FoldFunction foldFunction; - FoldTransformation(FoldingStateDescriptor stateDesc) { - this.stateDescriptor = Preconditions.checkNotNull(stateDesc); - this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction()); + FoldTransformation(FoldFunction foldFunction, HeapFoldingState stateRef) { + this.stateRef = Preconditions.checkNotNull(stateRef); --- End diff -- Done ð ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184104126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -185,36 +188,44 @@ public HeapKeyedStateBackend( stateName.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. " + "Was [" + stateTable.getMetaInfo().getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + stateName + "]."); - if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + if (!stateType.equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + stateType.equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. " + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + stateType + "]."); } @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.SnapshotrestoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot ) restoredKvStateMetaInfos.get(stateName); // check compatibility results to determine if state migration is required + TypeSerializer newNamespaceSerializer = namespaceSerializer.duplicate(); CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + newNamespaceSerializer); + TypeSerializer newValueSerializer = valueSerializer.duplicate(); --- End diff -- Similar to my comment on the Rocks code, this duplicate seems redundant, because the serializer also comes straight from a `StateDescriptor` which duplicates before handing it out. One thing to consider when removing this call: it is just less obvious here that the serializer was already duplicated, so maybe it would be good to pass the state descriptor as argument and get the serializer directly here to avoid any surprises for people working on this in the future. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184101616 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1125,59 +1125,62 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( * that we checkpointed, i.e. is already in the map of column families. */ @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( + protected Tuple2 > getColumnFamilyAndStateSerializer( StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { Tuple2 stateInfo = kvStateInformation.get(descriptor.getName()); - RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); - if (stateInfo != null) { // TODO with eager registration in place, these checks should be moved to restore() RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot ) restoredKvStateMetaInfos.get(descriptor.getName()); Preconditions.checkState( - Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), + Objects.equals(descriptor.getName(), restoredMetaInfo.getName()), "Incompatible state names. " + "Was [" + restoredMetaInfo.getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + descriptor.getName() + "]."); - if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) + if (!Objects.equals(descriptor.getType(), StateDescriptor.Type.UNKNOWN) && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), + descriptor.getType() == restoredMetaInfo.getStateType(), "Incompatible state types. " + "Was [" + restoredMetaInfo.getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + descriptor.getType() + "]."); } // check compatibility results to determine if state migration is required + TypeSerializer newNamespaceSerializer = namespaceSerializer.duplicate(); CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + newNamespaceSerializer); + TypeSerializer newStateSerializer = descriptor.getSerializer().duplicate(); --- End diff -- The `duplicate()` here looks redundant because it comes from the descriptor that already duplicates. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184098081 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -185,36 +188,44 @@ public HeapKeyedStateBackend( stateName.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. " + "Was [" + stateTable.getMetaInfo().getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + stateName + "]."); - if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + if (!stateType.equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + stateType.equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. " + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + stateType + "]."); } @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.SnapshotrestoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot ) restoredKvStateMetaInfos.get(stateName); // check compatibility results to determine if state migration is required + TypeSerializer newNamespaceSerializer = namespaceSerializer.duplicate(); --- End diff -- Just curious, why do we need to duplicate the serializer here but not in all other places like where `resolveCompatibilityResult()` is called? Or asked differently, should `resolveCompatibilityResult()` always do duplication internally or not at all or is this just as intended? ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184093390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -103,17 +104,17 @@ public void add(T value) throws IOException { private static final class FoldTransformationimplements StateTransformationFunction { - private final FoldingStateDescriptor stateDescriptor; + private final HeapFoldingState stateRef; private final FoldFunction foldFunction; - FoldTransformation(FoldingStateDescriptor stateDesc) { - this.stateDescriptor = Preconditions.checkNotNull(stateDesc); - this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction()); + FoldTransformation(FoldFunction foldFunction, HeapFoldingState stateRef) { + this.stateRef = Preconditions.checkNotNull(stateRef); --- End diff -- Maybe the more honest and simple way is making this a non-static inner class instead of passing a reference to `HeapFoldingState.this` in the constructor. Essentially it does the same. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184087656 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -49,17 +49,18 @@ /** * Creates a new key/value state for the given hash map of key/value pairs. * -* @param stateDesc The state identifier for the state. This contains name -* and can create a default state value. +* @param valueSerializer The serializer for the state. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ public HeapFoldingState( - FoldingStateDescriptorstateDesc, StateTable stateTable, TypeSerializer keySerializer, - TypeSerializer namespaceSerializer) { - super(stateDesc, stateTable, keySerializer, namespaceSerializer); - this.foldTransformation = new FoldTransformation<>(stateDesc); + TypeSerializer valueSerializer, + TypeSerializer namespaceSerializer, + ACC defaultValue, --- End diff -- I think you need to double check this on every state class, they look all not updated. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184087107 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java --- @@ -42,33 +42,35 @@ /** Map containing the actual key/value pairs. */ protected final StateTablestateTable; - /** This holds the name of the state and can create an initial default value for the state. */ - protected final SD stateDesc; - /** The current namespace, which the access methods will refer to. */ protected N currentNamespace; protected final TypeSerializer keySerializer; + protected final TypeSerializer valueSerializer; + protected final TypeSerializer namespaceSerializer; + private final SV defaultValue; + /** * Creates a new key/value state for the given hash map of key/value pairs. * -* @param stateDesc The state identifier for the state. This contains name -* and can create a default state value. +* @param valueSerializer The serializer for the state. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ protected AbstractHeapState( - SD stateDesc, StateTable stateTable, TypeSerializer keySerializer, - TypeSerializer namespaceSerializer) { + TypeSerializer valueSerializer, + TypeSerializer namespaceSerializer, --- End diff -- Comments require update. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184087357 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -49,17 +49,18 @@ /** * Creates a new key/value state for the given hash map of key/value pairs. * -* @param stateDesc The state identifier for the state. This contains name -* and can create a default state value. +* @param valueSerializer The serializer for the state. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ public HeapFoldingState( - FoldingStateDescriptorstateDesc, StateTable stateTable, TypeSerializer keySerializer, - TypeSerializer namespaceSerializer) { - super(stateDesc, stateTable, keySerializer, namespaceSerializer); - this.foldTransformation = new FoldTransformation<>(stateDesc); + TypeSerializer valueSerializer, + TypeSerializer namespaceSerializer, + ACC defaultValue, --- End diff -- Comments require update. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184086555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java --- @@ -47,21 +47,23 @@ /** * Creates a new key/value state for the given hash map of key/value pairs. * -* @param stateDesc -* The state identifier for the state. This contains name and can create a default state value. +* @param valueSerializer +* The serializer for the state. * @param stateTable * The state table to use in this kev/value state. May contain initial state. * @param namespaceSerializer * The serializer for the type that indicates the namespace */ public HeapAggregatingState( - AggregatingStateDescriptorstateDesc, StateTable stateTable, TypeSerializer keySerializer, - TypeSerializer namespaceSerializer) { + TypeSerializer valueSerializer, + TypeSerializer namespaceSerializer, + ACC defaultValue, --- End diff -- The comments are not updated to reflect this and the next line. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184085595 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -61,8 +64,7 @@ /** The column family of this particular instance of state. */ protected ColumnFamilyHandle columnFamily; - /** State descriptor from which to create this state instance. */ - protected final SD stateDesc; --- End diff -- Same as for the heap state also applies here, you can remove `SD` from the generic types of this class and the subclasses. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184084995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java --- @@ -42,33 +42,35 @@ /** Map containing the actual key/value pairs. */ protected final StateTablestateTable; - /** This holds the name of the state and can create an initial default value for the state. */ - protected final SD stateDesc; --- End diff -- I think with removing this, you can also remove SD from the generic type of this class, and transitively from the subclasses as well. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r183997653 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -169,13 +169,16 @@ public HeapKeyedStateBackend( TypeSerializer namespaceSerializer, TypeSerializer valueSerializer) throws StateMigrationException { - final RegisteredKeyedBackendStateMetaInfonewMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer); - @SuppressWarnings("unchecked") StateTable stateTable = (StateTable ) stateTables.get(stateName); if (stateTable == null) { + RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( --- End diff -- Mixing of concerns is not yet as bad as in the Rocks backend code, you might also start separating this a bit more here as well. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r183996477 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1125,59 +1125,62 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( * that we checkpointed, i.e. is already in the map of column families. */ @SuppressWarnings("rawtypes, unchecked") - protectedColumnFamilyHandle getColumnFamily( + protected Tuple2 > getColumnFamilyAndStateSerializer( --- End diff -- I think this method has grown way too complex over time, and looking at the `Tuple2` return type it becomes more and more clear that this code is mixing up 2 different concerns and could be untangled a bit. I would suggest to separate this into: 1) checking if this is a new state (does the map contain the name string), this is like a inlined check in current calling code. 2) If yes, do the serializer checks and configuration magic and create the `RegisteredKeyedBackendStateMetaInfo`. this goes to a separate method that is called by the current caller. 3) Request the column family, either by new registration or the existing one. can use the result from step 1 or recheck. this goes in another separate method called by the current caller. x) Optional: helper method that does steps 1-3 if we otherwise duplicate them too much. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r183309701 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1177,7 +1177,7 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( throw new StateMigrationException("State migration isn't supported, yet."); } else { stateInfo.f1 = newMetaInfo; - return stateInfo.f0; + return Tuple2.of(stateInfo.f0, newMetaInfo.getStateSerializer()); --- End diff -- I agree here, that we should let the meta info be immutable, and let the compatibility check result carry the compatible, reconfigured serializer. However, one issue is that this requires changes to the `CompatibilityResult` interface which is part of the public API. I would prefer not to touch the API now as we're approaching release. It would be possible to by-pass this by maybe introducing an internal compat result class, but downsides are - 1) that would have almost identical implementation to `CompatibilityResult`, and 2) that would entail touching a lot of our more complex serializer's code, because they use `CompatibilityUtil.resolveCompatibilityResult`. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r183214737 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1177,7 +1177,7 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( throw new StateMigrationException("State migration isn't supported, yet."); } else { stateInfo.f1 = newMetaInfo; - return stateInfo.f0; + return Tuple2.of(stateInfo.f0, newMetaInfo.getStateSerializer()); --- End diff -- Mirroring a the result from an offline discussion here: This is a bit fragile - the fact that the `newMetaInfo` is mutable and the serializer is altered in there and then obtained from there again. Makes it harder for future maintainers and easy to accidentally break in the future. The meta info should be immutable, and the re-configured serializer (or the original, if no reconfiguration is needed) would probably best be part of the compatiblity result. ---
[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5885 [FLINK-8715] Remove usage of StateDescriptor in state handles ## What is the purpose of the change This PR is WIP, and is still lacking test coverage. It is opened now to collect some feedback for a proposed solution for FLINK-8715. Previously, reconfigured state serializers on restore were not properly forwarded to the state handles. In the past, the `StateDescriptor` served as the holder for the reconfigured serializer. However, since 88ffad27, `StateDescriptor#getSerializer()` started giving out duplicates of the serializer, which caused reconfigured serializers to be a completely different copy then what the state handles were using. This fix corrects this by explicitly forwarding the serializer to the instantiated state handles after the state is registered at the state backend. It also eliminates the use of `StateDescriptor`s internally in the state handles, so that the behaviour is independent of the `StateDescriptor#getSerializer()` method's implementation. The alternative to this approach is to have an internal `setSerializer` method on the `StateDescriptor`, which should be used after state serializers are reconfigured on registration. Then, that assures that handed out serializers by the descriptor are always reconfigured, as soon as the descriptor is registered at the backend. ## Brief change log - Remove `StateDescriptor`s from heap / RocksDB state handle classes - Forwards state serializer and any other necessary information provided by the state descriptor (e.g. default value, user functions, nested serializers, etc.) when instantiating state handles. ## Verifying this change This fix still lacks test coverage. It has been opened to collect feedback for the approach. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8715 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5885.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5885 commit c092dd6518d9e6f47f4cfc797c18bedc8a89cc05 Author: Tzu-Li (Gordon) TaiDate: 2018-04-20T13:15:42Z [FLINK-8715] Remove usage of StateDescriptor in state handles ---