[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365279#comment-16365279 ] ASF GitHub Bot commented on FLINK-8411: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5485 @zentol pushed a fixup. What about the rest of the PR? > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365275#comment-16365275 ] ASF GitHub Bot commented on FLINK-8411: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5485#discussion_r168414749 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -696,6 +697,7 @@ public void clear() { @Override public void add(T value) throws Exception { + Objects.requireNonNull(value, "You cannot add null to a ListState."); --- End diff -- Will change > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364588#comment-16364588 ] ASF GitHub Bot commented on FLINK-8411: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5485#discussion_r168271328 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -696,6 +697,7 @@ public void clear() { @Override public void add(T value) throws Exception { + Objects.requireNonNull(value, "You cannot add null to a ListState."); --- End diff -- Why not our Preconditions version? > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364536#comment-16364536 ] ASF GitHub Bot commented on FLINK-8411: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5485 LGTM > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363948#comment-16363948 ] ASF GitHub Bot commented on FLINK-8411: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5485 [FLINK-8411] Don't allow null in ListState.add()/addAll() R: @StefanRRichter, @bowenli86 It turns out that this is a bit trickier than assumed earlier: `ListState.addAll()` was not considered and also had inconsistent behaviour between state backends before. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8411-fix-list-add Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5485 commit b39aa20d60df9effb536cc96c308645b9688113d Author: Aljoscha KrettekDate: 2018-02-14T11:04:20Z [FLINK-8411] Don't allow null in ListState.add()/addAll() > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363798#comment-16363798 ] Aljoscha Krettek commented on FLINK-8411: - [~phoenixjiangnan] I agree, I will change this to always throw an exception on {{null}}, then. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362887#comment-16362887 ] ASF GitHub Bot commented on FLINK-8411: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5300 NVM, I double checked that this PR is in 1.5.0. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362848#comment-16362848 ] ASF GitHub Bot commented on FLINK-8411: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5300 @aljoscha @StefanRRichter isn't this PR merged into 1.5.0? > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362843#comment-16362843 ] Bowen Li commented on FLINK-8411: - [~aljoscha] For the 1st functionality, isn't [the PR of this ticket|https://github.com/apache/flink/pull/5300] merged into 1.5.0 already? If not, I think we can just rebase any other changes to it. For the 2nd functionality, supporting {{null}} doesn't feel right logically... with my own experience, allowing null to be added will hide even more user bugs. BTW, how to represent {{null}} in RocksDBListState? Is it going to be a special char to represent {{null}} in serialization, and handle it specially in deserialization? Please feel free to implement it yourself to save time from back-and-forth communication and code review. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362687#comment-16362687 ] Aljoscha Krettek commented on FLINK-8411: - [~phoenixjiangnan] Would you be interested in working on this again? I would like to have this functionality: - {{ListState.add()}} never clears the list, on any backend - {{ListState.add(null)}} is consistent between state backends, and I think the right approach would be to allow adding {{nulls}}. Another alternative would be to throw an exception on {{null}} but I don't like that. I think {{null}} should not be silently ignored because that can hide bugs in the user code I can implement this myself, just asking if you're interested, btw. Also, we would need to get this in before end of this week because of the feature freeze. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344889#comment-16344889 ] Aljoscha Krettek commented on FLINK-8411: - I think it is a bug, yes. But it has been like this for ever now so we can't simply change it in a bugfix release, IMO. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344895#comment-16344895 ] Stefan Richter commented on FLINK-8411: --- Fine with me, we can revert the commit on the 1.4 branch as long as we keep it for >= 1.5 > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344870#comment-16344870 ] Stefan Richter commented on FLINK-8411: --- Ola, I always assumed that to be a copy-paste error when creating the \{{ListState}} from the \{{ValueState}} code :D > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344853#comment-16344853 ] Aljoscha Krettek commented on FLINK-8411: - This would be different from {{ValueState}}, though: "updating" a {{ValueState}} to {{null}} will clear that {{ValueState}}, which is also were the behaviour of Heap {{ListState}} comes from, which was added after {{ValueState}} (and before the RocksDB states). The behaviour of {{null == clear()}} was there form the beginning so is actually the "intended" behaviour and a "feature". > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344793#comment-16344793 ] Stefan Richter commented on FLINK-8411: --- That is a tough one. I feel like the previous behaviour for the heap state is a bug, so it should be addressed in a bugfix release. If we allow adding \{{null}}, ignore it, or prevent adding it through preconditions is debatable. One argument for ignoring is alignment with value state, where {{null}} is identical to "no state for this key", so I think the analog would be not adding \{{null}}s to the list. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344779#comment-16344779 ] Aljoscha Krettek commented on FLINK-8411: - [~phoenixjiangnan] & [~srichter] I thought about this some more. I don't think we can leave the current behaviour for 1.4.1 because it is a change in semantics that might break existing code. The fact that the RocksDB backend and the Heap backend behaved differently before is not good but I think the previous RocksDB behaviour might be the preferable behaviour: if the serialiser supports it adding {{null}} to the list might be a valid thing for some users. Simply ignoring {{nulls}} might lead to unexpected consequences. What do you think? > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335753#comment-16335753 ] ASF GitHub Bot commented on FLINK-8411: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5300 > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)