[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465484#comment-16465484
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5950


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465338#comment-16465338
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
Thanks for the review Stefan! Will merge this now ..


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463873#comment-16463873
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Overall, I think this looks good for me now  


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463869#comment-16463869
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r186082804
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -102,17 +99,24 @@
 *
 * @return the deserialized serializer.
 */
-   public static  TypeSerializer tryReadSerializer(DataInputView in, 
ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
+   public static  TypeSerializer tryReadSerializer(
+   DataInputView in,
+   ClassLoader userCodeClassLoader,
+   boolean useDummyPlaceholder) throws IOException {
+
final 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy proxy =
-   new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader,
 useDummyPlaceholder);
+   new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
try {
proxy.read(in);
return proxy.getTypeSerializer();
-   } catch (IOException e) {
-   LOG.warn("Deserialization of serializer errored; 
replacing with null.", e);
-
-   return null;
+   } catch (UnloadableTypeSerializerException e) {
--- End diff --

I would let this bubble up one more level, remove the flag here and only 
catch `UnloadableTypeSerializerException ` in the case where this method is 
called with `true`.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> 

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463867#comment-16463867
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Yes, I think we can only remove the flag further when splitting up 
`readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if 
we change this soon anyways.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463485#comment-16463485
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
@StefanRRichter I have updated the PR. Also had to do a rebase due to 
conflicts.

Regarding the thoughts you brought up:
- Bubble up `UnloadableTypeSerializerException` approach:
I introduced the exception, but only use it minimally. I think overall it 
is definitely an improvement, since we don't have to carry the dummy flag all 
the way down to the low-level serializer serialization proxy. However, I don't 
think there really is a need to handle it any level higher than the 
`TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience` 
method, since the original intent of that method was to always be fault 
tolerant when reading a bunch of serializers alongside other things. More 
details on that in my above comments.

- Whether or not we really need to hand down the flag to 
`KeyedBackendSerializationProxy`:
My gut feeling is that, even if in the future we bubble up the 
`UnloadableTypeSerializerException` to higher level components, the 
serialization proxy is still where we need to decide whether or not we handle 
it with a dummy serializer. The reason is that the serialization proxies 
handles deserialization of _all_ keyed state meta infos (and therefore their 
serializers); simply bubbling the exception further up without checking does 
not make sense.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when 

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463407#comment-16463407
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185994454
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

Since we are already thinking about not writing serializers anymore in 
savepoints for 1.6, I'm leaning towards not touching this method now.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463405#comment-16463405
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185995278
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

One other thing to keep in mind:
The original intent of having the 
`readSerializersAndConfigSnapshotsWithResilience` method and why we added the 
complex indexing of offsets, is that so we can _always_ be fault tolerant when 
trying to read a bunch of serializers. So, essentially, there is no need to 
push out the exception further - the result of 
`readSerializersAndConfigSnapshotsWithResilience` should always be that there 
is some serializer, even if it is a dummy (hence the naming).


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463406#comment-16463406
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185994375
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

The reason why this method is so complex, is because it handles indexing of 
the serializers' and serializer config snapshots' offsets within the byte 
stream. It does so to be able to read all serializers and their serializer 
config snapshots fault tolerantly, and to not leave the stream corrupt when 
some exception occurs.

I'm not sure we can break this method up - doing so would just be moving a 
lot of duplicate code to the callers (due to the fact that we previously have 
the offset index reading / writing, if we remove that we still need to maintain 
backwards compatibility).


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by 

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463384#comment-16463384
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
I think even with the `UnloadableTypeSerializerException` exception 
bubbling approach, we actually still need a flag in the serialization proxy to 
decide how to handle the exception.

The serialization proxy handles deserialization of all meta data of all 
registered key states, so that would be the highest level where we need to 
decide whether or not to use the dummy serializer.

If we want to hand out this control to an even higher level (i.e. the 
backend), we would then need to break up the deserialization logic from the 
serialization proxy, which IMO isn't appropriate.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463363#comment-16463363
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
@StefanRRichter yes, now that you mentioned it, the 
`isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the 
serialization proxy. Essentially, what it is only doing is serving as a switch 
to decide whether or not to fail - something that could be done by the caller.

I'll quickly try your suggested approach and see how that turns out.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462563#comment-16462563
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Please also check the travis build, some related tests seem to fail:
Tests in error: 
  
TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236
 » IO
  
TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109
 » IO
  
TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149
 » IO
  PojoSerializerTest.testSerializerSerializationFailureResilience:570 » IO


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462554#comment-16462554
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185828137
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

The problem is that this method might mix too many things together, that is 
also again visible in the complex return type and e.g. many call sites are only 
interested in the first element of the list. Wonder if we should break this up 
in dedicated steps (serializer, config) and let the callers invoke them one by 
one, so that we can handle exceptions on a higher level and make decisions 
about if we need to have a serializer there.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462547#comment-16462547
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185825767
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

This could be the place where we catch a `UnloadableSerializerException`, 
but if we let the caller do the iteration from 0 to 
`numSerializersAndConfigSnapshots`, we can push it out even more. Why is it 
helpful to create a list here? Otherwise we can do the exception handling in 
the caller and more fine grained.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462534#comment-16462534
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185821296
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException 
{
 

Thread.currentThread().setContextClassLoader(userClassLoader);
typeSerializer = (TypeSerializer) 
ois.readObject();
-   } catch (ClassNotFoundException | InvalidClassException 
e) {
+   } catch (Exception e) {
if (useDummyPlaceholder) {
// we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
// a proper typeserializer from the user
-   typeSerializer =
-   new 
UnloadableDummyTypeSerializer<>(buffer);
-   LOG.warn("Could not find requested 
TypeSerializer class in classpath. Created dummy.", e);
+   typeSerializer = new 
UnloadableDummyTypeSerializer<>(buffer);
--- End diff --

Some food for thought, even if it is not introduced by this PR: why can we 
not introduce a special `UnloadableSerializerException extends IOException` 
that holds a field  with the byte array in `buffer` and let it bubble up to a 
higher level component. If that component wants to introduce dummies, it can do 
some from the bytes in the caught exception, if not forward the exception. Then 
we would not have to hand down this flag but let the higher level component 
decide. What do you think?


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the 

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462341#comment-16462341
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185775633
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle()
private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
KeyedBackendSerializationProxy serializationProxy =
-   new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+   new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, 
false);
--- End diff --

Maybe a small comment on this line why we can tolerate the absence of the 
serializer is helpful for future maintenance. And a matching comment for the 
other option on the corresponding line in the heap backend.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462326#comment-16462326
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185772501
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.testutils;
+
+import java.util.Set;
+
+/**
+ * Utility classloader used in tests that allows simulating {@link 
ClassNotFoundException}s for specific classes.
+ */
+public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader {
--- End diff --

`ArtificialCNFExceptionThrowingClassLoader` might be a better fit


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from 

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462321#comment-16462321
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185771830
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -69,7 +69,7 @@
 * written using {@link #writeSerializer(DataOutputView, 
TypeSerializer)}.
 *
 * If deserialization fails for any reason (corrupted serializer 
bytes, serializer class
-* no longer in classpath, serializer class no longer valid, etc.), 
{@code null} will
+* no longer in classpath, serializer class no longer valid, etc.), an 
{@link IOException} is thrown.
--- End diff --

Comment in the next line should be deleted, looks like leftover from the 
copy-paste.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462076#comment-16462076
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5945


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462075#comment-16462075
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5945
  
I've opened a new PR #5950 which has a cleaner approach to this.
That new PR subsumes this one.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462059#comment-16462059
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5950

[FLINK-9169] [state-backend] Allow absence of old serializers when 
restoring RocksDBStateBackend

## What is the purpose of the change

This PR contains 2 commits that overall allows absence of old state 
serializers when restoring the RocksDB state backend. It also eliminates the 
possibility of confusing NPEs which comes with the fact that previously, 
restored serializers may be `null`.

Allowing old state serializers to be absent for the RocksDB state backend 
will allow for users to perform state evolutions that they couldn't before.

## Brief change log

- 2f9f0d9 Always use dummy serializer instead of null when old state 
serializer cannot be read
Previously, the behaviour of `TypeSerializerSerializationUtil` read methods 
in the case when serializers cannot be read, is quite mixed up. For some 
exceptions (e.g. `ClassNotFoundException,
InvalidClassException`), a dummy serializer will be used as a replacement. 
In other cases, `null` is used.
This commit fixes this by always using dummy serializers if a 
`useDummyPlaceholder` flag is set to true. Otherwise, an `IOException` is 
thrown. This makes it clear that users should use dummy serializers if they 
want the deserialization to be tolerant to failures.

Another benefit of this is that there will no longer be `null` serializers 
after restore; they will either be an actual serializer, or a dummy if the old 
serializer cannot be restored.

- 95223fc Adds a `isSerializerPresenceRequired` flag to the 
`KeyedBackendSerializationProxy`.
If true, restored serializers cannot be the dummy serializer, otherwise an 
`IOException` will be thrown to fail the restore. Heap backends set this to 
true, while RocksDB sets this to false.

## Verifying this change

There are two main test classes that already have coverage of this issue:
- `SerializationProxiesTest`
- `StateBackendTestBase`

A new test, `StateBackendTestBase#testSerializerPresenceOnRestore`, 
additionally verifies the restore behaviour of heap / rocksdb state backends 
when serializers are not present.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9169-approach2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5950


commit 2f9f0d9ab02c0c207e4ac887e958f8de9c057310
Author: Tzu-Li (Gordon) Tai 
Date:   2018-05-02T05:37:22Z

[FLINK-9169] [runtime] Always use dummy serializer instead of null when old 
state serializer cannot be read

Prreviously, the behaviour of TypeSerializerSerializationUtil read
methods in the case when serializers cannot be read, is quite mixed up.
For some exceptions (e.g. ClassNotFoundException,
InvalidClassException), a dummy serializer will be used as a
replacement. In other cases, null is used.

This commit fixes this by always using dummy serializers if a
'useDummyPlaceholder' flag is set to true. Otherwise, an IOException is
thrown. This makes it clear that users should use dummy serializers if
they want the deserialization to be tolerant to failures.

Another benefit of this is that there will no longer be 'null'
serializers after restore; they will either be an actual serializer, or
a dummy if the old serializer cannot be restored.

commit 95223fc129ed0439b3f14636721cb72bc7560876
Author: Tzu-Li (Gordon) Tai 
Date:   2018-05-03T07:30:55Z

[FLINK-9169] [runtime] Allow specifiying serializer presence requirement in 

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460706#comment-16460706
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5945

[FLINK-9169] [runtime] Allow KeyedBackendSerializationProxy to specify 
whether serializer presence is required

## What is the purpose of the change

Previously, in both the RocksDB and heap state backends, on restore of a 
savepoint if the previous state serializer can not be read, the restore will 
fail.

This is a must in the heap side, but not for RocksDB, since the restore can 
still proceed with the new serializer (given that the serializer is compatible).
This PR relaxes the requirement of serializer presence at restore time for 
RocksDB.

## Brief change log

- Let `KeyedBackendSerializationProxy` allow specifying a flag whether or 
not serializer presence is strictly required when restoring the keyed backend. 
For heap backends, this flag would be `true`, while for RocksDB this flag is 
`false`.
- `TypeSerializerSerializationUtil.tryReadSerializer(...)` now always 
returns a `UnloadableDummyTypeSerializer` if the `useDummyPlaceholder` is set 
to `true` and an exception occurred (regardless of what exception) while 
reading serializers. It only returns `null` if `useDummyPlaceholder` is 
`false`. This flag corresponds to the serializer presence flag mentioned above.

## Verifying this change

There are already a few existing tests related to this issue:
- 
`MemoryStateBackendTest.testKeyedStateRestoreFailsIfSerializerDeserializationFails`
- 
`SerializationProxiesTest.testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures`

What may still be missing is an e2e test, that verifies RocksDB can restore 
correctly even if a previous serializer class is no longer in the classpath 
(i.e. is replaced by a new compatible serializer of a different class).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9169

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5945


commit 0eb2918a7551094b712173e332df7f2663ecf0cc
Author: Tzu-Li (Gordon) Tai 
Date:   2018-05-02T05:37:22Z

[FLINK-9169] [runtime] Allow KeyedBackendSerializationProxy to specify 
whether serializer presence is required




> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
>