[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716704#comment-16716704
 ] 

ASF GitHub Bot commented on FLINK-11087:


asfgit closed pull request #7256: [FLINK-11087] [state] Incorrect K/V 
serializer association when reading broadcast state from 1.5.x snapshots
URL: https://github.com/apache/flink/pull/7256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md
index 0451940723e..22cea4c53f0 100644
--- a/docs/ops/upgrading.md
+++ b/docs/ops/upgrading.md
@@ -274,7 +274,10 @@ Savepoints are compatible across Flink versions as 
indicated by the table below:
   O
   O
   O
-  
+  There is a known issue with resuming broadcast 
state created with 1.5.x in versions
+  1.6.x up to 1.6.2, and 1.7.0: https://issues.apache.org/jira/browse/FLINK-11087;>FLINK-11087. Users
+  upgrading to 1.6.x or 1.7.x series need to directly migrate to minor 
versions higher than 1.6.2 and 1.7.0,
+  respectively.
 
 
   1.6.x
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
index 77c267adff1..836edef0aac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
@@ -132,11 +132,6 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
 
static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = 
new OperatorBackendStateMetaInfoReaderV2V3();
 
-   private static final String[] ORDERED_KEY_STRINGS =
-   new String[]{
-   
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
-   
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()};
-
@Nonnull
@Override
public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@@ -156,17 +151,25 @@ public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
final int listSize = 
stateSerializerAndConfigList.size();
StateMetaInfoSnapshot.BackendStateType stateType = 
listSize == 1 ?
StateMetaInfoSnapshot.BackendStateType.OPERATOR 
: StateMetaInfoSnapshot.BackendStateType.BROADCAST;
-   Map> 
serializerConfigsMap = new HashMap<>(listSize);
-   for (int i = 0; i < listSize; ++i) {
-   Tuple2, 
TypeSerializerSnapshot> serializerAndConf =
-   stateSerializerAndConfigList.get(i);
-
-   // this particular mapping happens to support 
both, V2 and V3
-   String serializerKey = 
ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i];
 
-   serializerConfigsMap.put(
-   serializerKey,
-   serializerAndConf.f1);
+   Map> 
serializerConfigsMap = new HashMap<>(listSize);
+   switch (stateType) {
+   case OPERATOR:
+   serializerConfigsMap.put(
+   
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+   
stateSerializerAndConfigList.get(0).f1);
+   break;
+   case BROADCAST:
+   serializerConfigsMap.put(
+   
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
+   
stateSerializerAndConfigList.get(0).f1);
+
+   serializerConfigsMap.put(
+   
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+   
stateSerializerAndConfigList.get(1).f1);
+   break;
+   default:
+   throw new 
IllegalStateException("Unknown operator state type " + stateType);
}
 
return new StateMetaInfoSnapshot(
diff --git 

[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711678#comment-16711678
 ] 

ASF GitHub Bot commented on FLINK-11087:


tzulitai opened a new pull request #7256: [FLINK-11087] [state] Incorrect K/V 
serializer association when reading broadcast from 1.5.x snapshots
URL: https://github.com/apache/flink/pull/7256
 
 
   ## What is the purpose of the change
   
   This is a bug that prevents Flink versions 1.6.x (up to latest 1.6.2) and 
1.7.0 to successfully restore broadcast state that was taken in 1.5.x.
   
   The problem is that when restoring a broadcast state's meta information from 
a 1.5.x savepoint, the
   `LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3` 
incorrectly associates the first restored serializer as the value serializer, 
and the second restored serializer as the key serializer.
   The actual order of this should be the other way around.
   
   This PR also updates the `StatefulJobWBroadcastStateMigrationITCase` to have 
different K/V types for the broadcast states under test, as well as re-generate 
the test savepoints in `release-1.5` and `release-1.6`.
   That migration ITCase failed to catch this bug, because with K/V types being 
identical (and therefore identical serializers), the incorrect association 
didn't affect the result of the test.
   
   ## Brief change log
   
   - Fix K/V serializer association in 
`LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3`.
   - Update `StatefulJobWBroadcastStateMigrationITCase` to have different K/V 
types for the broadcast states under test
   - Regenerate test savepoints for `StatefulJobWBroadcastStateMigrationITCase` 
under branches `release-1.6` and `release-1.5`.
   - Amend compatibility table in docs 
(https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table)
 to notify the issue.
   
   ## Verifying this change
   
   The updated `StatefulJobWBroadcastStateMigrationITCase` should pass.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: Migration, State, broadcast, pull-request-available
> Fix For: 1.6.3, 1.7.1
>
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value 

[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711638#comment-16711638
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

Yes, will do.

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711637#comment-16711637
 ] 

Chesnay Schepler commented on FLINK-11087:
--

[~tzulitai] Can you amend the migration matrix in the 
[documentation|https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table]?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711635#comment-16711635
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

Confirmed, this is indeed a bug, and the cause is what we've discussed above.

Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this.
Thanks for the investigation [~edRojas].

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711511#comment-16711511
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to 
verify this.
Currently that ITCase had now way to capture this bug, because the key / value 
types of broadcast state tested there are all identical ..

If it turns out to be true, then we'll need to lift this as a blocker bug.

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711495#comment-16711495
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11087:
-

I checked the key / value serializer association in the 
{{DefaultOperatorStateBackend}}; it seems to be correct.

The problem, as [~edRojas] has indicated, is when restoring from 1.5 
(regardless of whether the restore happens in 1.6 or 1.7), the serializer keys 
( {{KEY_SERIALIZER}} and {{VALUE_SERIALIZER}} ) in the state meta info had 
swapped positions.
Because of this line, 
[https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165],
for broadcast state meta info the first read serializer (which is the key 
serializer) is assigned the {{VALUE_SERIALIZER}} key, while the second read 
serializer (which is the value serializer) is assigned the {{KEY_SERIALIZER}} 
key.

This looks like a bug that affects both Flink 1.6.x and Flink 1.7.0.

[~stefanrichte...@gmail.com] could you confirm this?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Edward Rojas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711469#comment-16711469
 ] 

Edward Rojas commented on FLINK-11087:
--

I just tried out, 1.5.3 to 1.6.2.

I get a different error, but with the debugging, I see that the reason is the 
same, the key serializer and the value serializer are swapped in the 
OperatorBackendStateMetaInfoReaderV2V3 class. 

The error is the following:
{noformat}
org.apache.flink.util.StateMigrationException: State migration is currently not 
supported.
at 
org.apache.flink.util.StateMigrationException.notSupported(StateMigrationException.java:42)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:263)
at 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745){noformat}

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0

2018-12-06 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711437#comment-16711437
 ] 

Chesnay Schepler commented on FLINK-11087:
--

Have you tried migration from 1.5 to 1.6 to 1.7?

> Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
> -
>
> Key: FLINK-11087
> URL: https://issues.apache.org/jira/browse/FLINK-11087
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.7.0
> Environment: Migration from Flink 1.5.3 to Flink 1.7.0
>Reporter: Edward Rojas
>Priority: Major
>  Labels: Migration, State, broadcast
>
> When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast 
> state throws the following error:
> {noformat}
> org.apache.flink.util.StateMigrationException: The new key serializer for 
> broadcast state must not be incompatible.
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238)
> at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745){noformat}
> The broadcast is using a MapState with StringSerializer as key serializer and 
> a custom JsonSerializer as value serializer. 
> There was no changes in the TypeSerializers used, only upgrade of version. 
>  
> With some debugging I see that at the moment of the validation of the 
> compatibility of states in the DefaultOperatorStateBackend class, the 
> "*registeredBroadcastStates*" containing the data about the 'old' state, 
> contains wrong association of the key and value serializer. This is, 
> JsonSerializer appears as key serializer and StringSerializer appears as 
> value serializer. (when it should be the contrary)
>  
> After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" 
> class is the responsible of this swap here:
> https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)