[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465484#comment-16465484 ] ASF GitHub Bot commented on FLINK-9169: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5950 > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465338#comment-16465338 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 Thanks for the review Stefan! Will merge this now .. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463873#comment-16463873 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Overall, I think this looks good for me now > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463869#comment-16463869 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r186082804 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -102,17 +99,24 @@ * * @return the deserialized serializer. */ - public static TypeSerializer tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) { + public static TypeSerializer tryReadSerializer( + DataInputView in, + ClassLoader userCodeClassLoader, + boolean useDummyPlaceholder) throws IOException { + final TypeSerializerSerializationUtil.TypeSerializerSerializationProxy proxy = - new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader, useDummyPlaceholder); + new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader); try { proxy.read(in); return proxy.getTypeSerializer(); - } catch (IOException e) { - LOG.warn("Deserialization of serializer errored; replacing with null.", e); - - return null; + } catch (UnloadableTypeSerializerException e) { --- End diff -- I would let this bubble up one more level, remove the flag here and only catch `UnloadableTypeSerializerException ` in the case where this method is called with `true`. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in >
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463867#comment-16463867 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Yes, I think we can only remove the flag further when splitting up `readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if we change this soon anyways. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463485#comment-16463485 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 @StefanRRichter I have updated the PR. Also had to do a rebase due to conflicts. Regarding the thoughts you brought up: - Bubble up `UnloadableTypeSerializerException` approach: I introduced the exception, but only use it minimally. I think overall it is definitely an improvement, since we don't have to carry the dummy flag all the way down to the low-level serializer serialization proxy. However, I don't think there really is a need to handle it any level higher than the `TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience` method, since the original intent of that method was to always be fault tolerant when reading a bunch of serializers alongside other things. More details on that in my above comments. - Whether or not we really need to hand down the flag to `KeyedBackendSerializationProxy`: My gut feeling is that, even if in the future we bubble up the `UnloadableTypeSerializerException` to higher level components, the serialization proxy is still where we need to decide whether or not we handle it with a dummy serializer. The reason is that the serialization proxies handles deserialization of _all_ keyed state meta infos (and therefore their serializers); simply bubbling the exception further up without checking does not make sense. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463407#comment-16463407 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185994454 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- Since we are already thinking about not writing serializers anymore in savepoints for 1.6, I'm leaning towards not touching this method now. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463405#comment-16463405 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185995278 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- One other thing to keep in mind: The original intent of having the `readSerializersAndConfigSnapshotsWithResilience` method and why we added the complex indexing of offsets, is that so we can _always_ be fault tolerant when trying to read a bunch of serializers. So, essentially, there is no need to push out the exception further - the result of `readSerializersAndConfigSnapshotsWithResilience` should always be that there is some serializer, even if it is a dummy (hence the naming). > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463406#comment-16463406 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185994375 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- The reason why this method is so complex, is because it handles indexing of the serializers' and serializer config snapshots' offsets within the byte stream. It does so to be able to read all serializers and their serializer config snapshots fault tolerantly, and to not leave the stream corrupt when some exception occurs. I'm not sure we can break this method up - doing so would just be moving a lot of duplicate code to the callers (due to the fact that we previously have the offset index reading / writing, if we remove that we still need to maintain backwards compatibility). > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463384#comment-16463384 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 I think even with the `UnloadableTypeSerializerException` exception bubbling approach, we actually still need a flag in the serialization proxy to decide how to handle the exception. The serialization proxy handles deserialization of all meta data of all registered key states, so that would be the highest level where we need to decide whether or not to use the dummy serializer. If we want to hand out this control to an even higher level (i.e. the backend), we would then need to break up the deserialization logic from the serialization proxy, which IMO isn't appropriate. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463363#comment-16463363 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 @StefanRRichter yes, now that you mentioned it, the `isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the serialization proxy. Essentially, what it is only doing is serving as a switch to decide whether or not to fail - something that could be done by the caller. I'll quickly try your suggested approach and see how that turns out. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462563#comment-16462563 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Please also check the travis build, some related tests seem to fail: Tests in error: TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236 » IO TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109 » IO TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149 » IO PojoSerializerTest.testSerializerSerializationFailureResilience:570 » IO > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462554#comment-16462554 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185828137 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- The problem is that this method might mix too many things together, that is also again visible in the complex return type and e.g. many call sites are only interested in the first element of the list. Wonder if we should break this up in dedicated steps (serializer, config) and let the callers invoke them one by one, so that we can handle exceptions on a higher level and make decisions about if we need to have a serializer there. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462547#comment-16462547 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185825767 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- This could be the place where we catch a `UnloadableSerializerException`, but if we let the caller do the iteration from 0 to `numSerializersAndConfigSnapshots`, we can push it out even more. Why is it helpful to create a list here? Otherwise we can do the exception handling in the caller and more fine grained. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462534#comment-16462534 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185821296 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException { Thread.currentThread().setContextClassLoader(userClassLoader); typeSerializer = (TypeSerializer) ois.readObject(); - } catch (ClassNotFoundException | InvalidClassException e) { + } catch (Exception e) { if (useDummyPlaceholder) { // we create a dummy so that all the information is not lost when we get a new checkpoint before receiving // a proper typeserializer from the user - typeSerializer = - new UnloadableDummyTypeSerializer<>(buffer); - LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e); + typeSerializer = new UnloadableDummyTypeSerializer<>(buffer); --- End diff -- Some food for thought, even if it is not introduced by this PR: why can we not introduce a special `UnloadableSerializerException extends IOException` that holds a field with the byte array in `buffer` and let it bubble up to a higher level component. If that component wants to introduce dummies, it can do some from the bytes in the caught exception, if not forward the exception. Then we would not have to hand down this flag but let the higher level component decide. What do you think? > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462341#comment-16462341 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185775633 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle() private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false); --- End diff -- Maybe a small comment on this line why we can tolerate the absence of the serializer is helpful for future maintenance. And a matching comment for the other option on the corresponding line in the heap backend. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462326#comment-16462326 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185772501 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.testutils; + +import java.util.Set; + +/** + * Utility classloader used in tests that allows simulating {@link ClassNotFoundException}s for specific classes. + */ +public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader { --- End diff -- `ArtificialCNFExceptionThrowingClassLoader` might be a better fit > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462321#comment-16462321 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185771830 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -69,7 +69,7 @@ * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}. * * If deserialization fails for any reason (corrupted serializer bytes, serializer class -* no longer in classpath, serializer class no longer valid, etc.), {@code null} will +* no longer in classpath, serializer class no longer valid, etc.), an {@link IOException} is thrown. --- End diff -- Comment in the next line should be deleted, looks like leftover from the copy-paste. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462076#comment-16462076 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5945 > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462075#comment-16462075 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5945 I've opened a new PR #5950 which has a cleaner approach to this. That new PR subsumes this one. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462059#comment-16462059 ] ASF GitHub Bot commented on FLINK-9169: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5950 [FLINK-9169] [state-backend] Allow absence of old serializers when restoring RocksDBStateBackend ## What is the purpose of the change This PR contains 2 commits that overall allows absence of old state serializers when restoring the RocksDB state backend. It also eliminates the possibility of confusing NPEs which comes with the fact that previously, restored serializers may be `null`. Allowing old state serializers to be absent for the RocksDB state backend will allow for users to perform state evolutions that they couldn't before. ## Brief change log - 2f9f0d9 Always use dummy serializer instead of null when old state serializer cannot be read Previously, the behaviour of `TypeSerializerSerializationUtil` read methods in the case when serializers cannot be read, is quite mixed up. For some exceptions (e.g. `ClassNotFoundException, InvalidClassException`), a dummy serializer will be used as a replacement. In other cases, `null` is used. This commit fixes this by always using dummy serializers if a `useDummyPlaceholder` flag is set to true. Otherwise, an `IOException` is thrown. This makes it clear that users should use dummy serializers if they want the deserialization to be tolerant to failures. Another benefit of this is that there will no longer be `null` serializers after restore; they will either be an actual serializer, or a dummy if the old serializer cannot be restored. - 95223fc Adds a `isSerializerPresenceRequired` flag to the `KeyedBackendSerializationProxy`. If true, restored serializers cannot be the dummy serializer, otherwise an `IOException` will be thrown to fail the restore. Heap backends set this to true, while RocksDB sets this to false. ## Verifying this change There are two main test classes that already have coverage of this issue: - `SerializationProxiesTest` - `StateBackendTestBase` A new test, `StateBackendTestBase#testSerializerPresenceOnRestore`, additionally verifies the restore behaviour of heap / rocksdb state backends when serializers are not present. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9169-approach2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5950.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5950 commit 2f9f0d9ab02c0c207e4ac887e958f8de9c057310 Author: Tzu-Li (Gordon) TaiDate: 2018-05-02T05:37:22Z [FLINK-9169] [runtime] Always use dummy serializer instead of null when old state serializer cannot be read Prreviously, the behaviour of TypeSerializerSerializationUtil read methods in the case when serializers cannot be read, is quite mixed up. For some exceptions (e.g. ClassNotFoundException, InvalidClassException), a dummy serializer will be used as a replacement. In other cases, null is used. This commit fixes this by always using dummy serializers if a 'useDummyPlaceholder' flag is set to true. Otherwise, an IOException is thrown. This makes it clear that users should use dummy serializers if they want the deserialization to be tolerant to failures. Another benefit of this is that there will no longer be 'null' serializers after restore; they will either be an actual serializer, or a dummy if the old serializer cannot be restored. commit 95223fc129ed0439b3f14636721cb72bc7560876 Author: Tzu-Li (Gordon) Tai Date: 2018-05-03T07:30:55Z [FLINK-9169] [runtime] Allow specifiying serializer presence requirement in
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460706#comment-16460706 ] ASF GitHub Bot commented on FLINK-9169: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5945 [FLINK-9169] [runtime] Allow KeyedBackendSerializationProxy to specify whether serializer presence is required ## What is the purpose of the change Previously, in both the RocksDB and heap state backends, on restore of a savepoint if the previous state serializer can not be read, the restore will fail. This is a must in the heap side, but not for RocksDB, since the restore can still proceed with the new serializer (given that the serializer is compatible). This PR relaxes the requirement of serializer presence at restore time for RocksDB. ## Brief change log - Let `KeyedBackendSerializationProxy` allow specifying a flag whether or not serializer presence is strictly required when restoring the keyed backend. For heap backends, this flag would be `true`, while for RocksDB this flag is `false`. - `TypeSerializerSerializationUtil.tryReadSerializer(...)` now always returns a `UnloadableDummyTypeSerializer` if the `useDummyPlaceholder` is set to `true` and an exception occurred (regardless of what exception) while reading serializers. It only returns `null` if `useDummyPlaceholder` is `false`. This flag corresponds to the serializer presence flag mentioned above. ## Verifying this change There are already a few existing tests related to this issue: - `MemoryStateBackendTest.testKeyedStateRestoreFailsIfSerializerDeserializationFails` - `SerializationProxiesTest.testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures` What may still be missing is an e2e test, that verifies RocksDB can restore correctly even if a previous serializer class is no longer in the classpath (i.e. is replaced by a new compatible serializer of a different class). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-9169 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5945.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5945 commit 0eb2918a7551094b712173e332df7f2663ecf0cc Author: Tzu-Li (Gordon) TaiDate: 2018-05-02T05:37:22Z [FLINK-9169] [runtime] Allow KeyedBackendSerializationProxy to specify whether serializer presence is required > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at >