[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748190#comment-15748190 ] ASF GitHub Bot commented on FLINK-5289: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2969 > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15746105#comment-15746105 ] ASF GitHub Bot commented on FLINK-5289: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2969 +1 looks good. Will put this into my merge pipeline > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735415#comment-15735415 ] ASF GitHub Bot commented on FLINK-5289: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2969 Updated w.r.t. comments. > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735126#comment-15735126 ] ASF GitHub Bot commented on FLINK-5289: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2969#discussion_r91703632 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java --- @@ -106,17 +109,45 @@ public boolean hasBroadcastVariable(String name) { @Override public ValueState getState(ValueStateDescriptor stateProperties) { - return operator.getKeyedStateStore().getState(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + + try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } } @Override public ListState getListState(ListStateDescriptor stateProperties) { - return operator.getKeyedStateStore().getListState(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + + try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getListState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } } @Override public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { - return operator.getKeyedStateStore().getReducingState(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + + try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getReducingState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); --- End diff -- Do we need to re-wrap the exceptions here? Can we simply let the original exception bubble up? > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735125#comment-15735125 ] ASF GitHub Bot commented on FLINK-5289: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2969#discussion_r91703759 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java --- @@ -106,17 +109,45 @@ public boolean hasBroadcastVariable(String name) { @Override public ValueState getState(ValueStateDescriptor stateProperties) { - return operator.getKeyedStateStore().getState(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + + try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } } @Override public ListState getListState(ListStateDescriptor stateProperties) { - return operator.getKeyedStateStore().getListState(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + + try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getListState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } } @Override public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { - return operator.getKeyedStateStore().getReducingState(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); + + try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); + return keyedStateStore.getReducingState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor stateDescriptor) { + Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null"); + KeyedStateStore keyedStateStore = operator.getKeyedStateStore(); + Preconditions.checkNotNull(keyedStateStore, "Keyed state store is null. This can only be called after a keyBy."); --- End diff -- How about changing this message to `Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.` > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15732605#comment-15732605 ] ASF GitHub Bot commented on FLINK-5289: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2969 [FLINK-5289] Meaningful exception when using value state on non-keyed… This PR fixes [FLINK-5289] and introduces a meaningful exception when registering a keyed state on non-keyed stream. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink register-state-NPE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2969.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2969 commit fba7f61ee6ff7155ee166ec01134f81f0b7f4457 Author: Stefan Richter Date: 2016-12-08T14:07:58Z [FLINK-5289] Meaningful exception when using value state on non-keyed stream > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5289) NPE when using value state on non-keyed stream
[ https://issues.apache.org/jira/browse/FLINK-5289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15732607#comment-15732607 ] ASF GitHub Bot commented on FLINK-5289: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2969 CC @twalthr > NPE when using value state on non-keyed stream > -- > > Key: FLINK-5289 > URL: https://issues.apache.org/jira/browse/FLINK-5289 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Timo Walther >Assignee: Stefan Richter > > Using a {{ValueStateDescriptor}} and > {{getRuntimeContext().getState(descriptor)}} on a non-keyed stream leads to > {{NullPointerException}} which is not very helpful for users: > {code} > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)