[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716704#comment-16716704 ] ASF GitHub Bot commented on FLINK-11087: asfgit closed pull request #7256: [FLINK-11087] [state] Incorrect K/V serializer association when reading broadcast state from 1.5.x snapshots URL: https://github.com/apache/flink/pull/7256 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md index 0451940723e..22cea4c53f0 100644 --- a/docs/ops/upgrading.md +++ b/docs/ops/upgrading.md @@ -274,7 +274,10 @@ Savepoints are compatible across Flink versions as indicated by the table below: O O O - + There is a known issue with resuming broadcast state created with 1.5.x in versions + 1.6.x up to 1.6.2, and 1.7.0: https://issues.apache.org/jira/browse/FLINK-11087;>FLINK-11087. Users + upgrading to 1.6.x or 1.7.x series need to directly migrate to minor versions higher than 1.6.2 and 1.7.0, + respectively. 1.6.x diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java index 77c267adff1..836edef0aac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java @@ -132,11 +132,6 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot( static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = new OperatorBackendStateMetaInfoReaderV2V3(); - private static final String[] ORDERED_KEY_STRINGS = - new String[]{ - StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(), - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()}; - @Nonnull @Override public StateMetaInfoSnapshot readStateMetaInfoSnapshot( @@ -156,17 +151,25 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot( final int listSize = stateSerializerAndConfigList.size(); StateMetaInfoSnapshot.BackendStateType stateType = listSize == 1 ? StateMetaInfoSnapshot.BackendStateType.OPERATOR : StateMetaInfoSnapshot.BackendStateType.BROADCAST; - Map> serializerConfigsMap = new HashMap<>(listSize); - for (int i = 0; i < listSize; ++i) { - Tuple2, TypeSerializerSnapshot> serializerAndConf = - stateSerializerAndConfigList.get(i); - - // this particular mapping happens to support both, V2 and V3 - String serializerKey = ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i]; - serializerConfigsMap.put( - serializerKey, - serializerAndConf.f1); + Map> serializerConfigsMap = new HashMap<>(listSize); + switch (stateType) { + case OPERATOR: + serializerConfigsMap.put( + StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(), + stateSerializerAndConfigList.get(0).f1); + break; + case BROADCAST: + serializerConfigsMap.put( + StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(), + stateSerializerAndConfigList.get(0).f1); + + serializerConfigsMap.put( + StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(), + stateSerializerAndConfigList.get(1).f1); + break; + default: + throw new IllegalStateException("Unknown operator state type " + stateType); } return new StateMetaInfoSnapshot( diff --git
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711678#comment-16711678 ] ASF GitHub Bot commented on FLINK-11087: tzulitai opened a new pull request #7256: [FLINK-11087] [state] Incorrect K/V serializer association when reading broadcast from 1.5.x snapshots URL: https://github.com/apache/flink/pull/7256 ## What is the purpose of the change This is a bug that prevents Flink versions 1.6.x (up to latest 1.6.2) and 1.7.0 to successfully restore broadcast state that was taken in 1.5.x. The problem is that when restoring a broadcast state's meta information from a 1.5.x savepoint, the `LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3` incorrectly associates the first restored serializer as the value serializer, and the second restored serializer as the key serializer. The actual order of this should be the other way around. This PR also updates the `StatefulJobWBroadcastStateMigrationITCase` to have different K/V types for the broadcast states under test, as well as re-generate the test savepoints in `release-1.5` and `release-1.6`. That migration ITCase failed to catch this bug, because with K/V types being identical (and therefore identical serializers), the incorrect association didn't affect the result of the test. ## Brief change log - Fix K/V serializer association in `LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3`. - Update `StatefulJobWBroadcastStateMigrationITCase` to have different K/V types for the broadcast states under test - Regenerate test savepoints for `StatefulJobWBroadcastStateMigrationITCase` under branches `release-1.6` and `release-1.5`. - Amend compatibility table in docs (https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table) to notify the issue. ## Verifying this change The updated `StatefulJobWBroadcastStateMigrationITCase` should pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / no / don't know) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2, 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Labels: Migration, State, broadcast, pull-request-available > Fix For: 1.6.3, 1.7.1 > > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711638#comment-16711638 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - Yes, will do. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711637#comment-16711637 ] Chesnay Schepler commented on FLINK-11087: -- [~tzulitai] Can you amend the migration matrix in the [documentation|https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table]? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711635#comment-16711635 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - Confirmed, this is indeed a bug, and the cause is what we've discussed above. Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this. Thanks for the investigation [~edRojas]. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711511#comment-16711511 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to verify this. Currently that ITCase had now way to capture this bug, because the key / value types of broadcast state tested there are all identical .. If it turns out to be true, then we'll need to lift this as a blocker bug. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711495#comment-16711495 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - I checked the key / value serializer association in the {{DefaultOperatorStateBackend}}; it seems to be correct. The problem, as [~edRojas] has indicated, is when restoring from 1.5 (regardless of whether the restore happens in 1.6 or 1.7), the serializer keys ( {{KEY_SERIALIZER}} and {{VALUE_SERIALIZER}} ) in the state meta info had swapped positions. Because of this line, [https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165], for broadcast state meta info the first read serializer (which is the key serializer) is assigned the {{VALUE_SERIALIZER}} key, while the second read serializer (which is the value serializer) is assigned the {{KEY_SERIALIZER}} key. This looks like a bug that affects both Flink 1.6.x and Flink 1.7.0. [~stefanrichte...@gmail.com] could you confirm this? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711469#comment-16711469 ] Edward Rojas commented on FLINK-11087: -- I just tried out, 1.5.3 to 1.6.2. I get a different error, but with the debugging, I see that the reason is the same, the key serializer and the value serializer are swapped in the OperatorBackendStateMetaInfoReaderV2V3 class. The error is the following: {noformat} org.apache.flink.util.StateMigrationException: State migration is currently not supported. at org.apache.flink.util.StateMigrationException.notSupported(StateMigrationException.java:42) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:263) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745){noformat} > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711437#comment-16711437 ] Chesnay Schepler commented on FLINK-11087: -- Have you tried migration from 1.5 to 1.6 to 1.7? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)