[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365279#comment-16365279
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5485
  
@zentol pushed a fixup. What about the rest of the PR?


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365275#comment-16365275
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5485#discussion_r168414749
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -696,6 +697,7 @@ public void clear() {
 
@Override
public void add(T value) throws Exception {
+   Objects.requireNonNull(value, "You cannot add null to a 
ListState.");
--- End diff --

Will change


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364588#comment-16364588
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5485#discussion_r168271328
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -696,6 +697,7 @@ public void clear() {
 
@Override
public void add(T value) throws Exception {
+   Objects.requireNonNull(value, "You cannot add null to a 
ListState.");
--- End diff --

Why not our Preconditions version?


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364536#comment-16364536
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5485
  
LGTM 


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363948#comment-16363948
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5485

[FLINK-8411] Don't allow null in ListState.add()/addAll()

R: @StefanRRichter, @bowenli86 

It turns out that this is a bit trickier than assumed earlier: 
`ListState.addAll()` was not considered and also had inconsistent behaviour 
between state backends before.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8411-fix-list-add

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5485


commit b39aa20d60df9effb536cc96c308645b9688113d
Author: Aljoscha Krettek 
Date:   2018-02-14T11:04:20Z

[FLINK-8411] Don't allow null in ListState.add()/addAll()




> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363798#comment-16363798
 ] 

Aljoscha Krettek commented on FLINK-8411:
-

[~phoenixjiangnan] I agree, I will change this to always throw an exception on 
{{null}}, then.

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362887#comment-16362887
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5300
  
NVM, I double checked that this PR is in 1.5.0.


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362848#comment-16362848
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5300
  
@aljoscha @StefanRRichter isn't this PR merged into 1.5.0?


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-13 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362843#comment-16362843
 ] 

Bowen Li commented on FLINK-8411:
-

[~aljoscha] 
For the 1st functionality, isn't [the PR of this 
ticket|https://github.com/apache/flink/pull/5300] merged into 1.5.0 already? If 
not, I think we can just rebase any other changes to it.

For the 2nd functionality, supporting {{null}} doesn't feel right logically... 
with my own experience, allowing null to be added will hide even more user 
bugs. BTW, how to represent {{null}} in RocksDBListState? Is it going to be a 
special char to represent {{null}} in serialization, and handle it specially in 
deserialization?

Please feel free to implement it yourself to save time from back-and-forth 
communication and code review.

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-02-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362687#comment-16362687
 ] 

Aljoscha Krettek commented on FLINK-8411:
-

[~phoenixjiangnan] Would you be interested in working on this again? I would 
like to have this functionality:
 - {{ListState.add()}} never clears the list, on any backend
 - {{ListState.add(null)}} is consistent between state backends, and I think 
the right approach would be to allow adding {{nulls}}. Another alternative 
would be to throw an exception on {{null}} but I don't like that. I think 
{{null}} should not be silently ignored because that can hide bugs in the user 
code

I can implement this myself, just asking if you're interested, btw. Also, we 
would need to get this in before end of this week because of the feature 
freeze. 

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344889#comment-16344889
 ] 

Aljoscha Krettek commented on FLINK-8411:
-

I think it is a bug, yes. But it has been like this for ever now so we can't 
simply change it in a bugfix release, IMO. 

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344895#comment-16344895
 ] 

Stefan Richter commented on FLINK-8411:
---

Fine with me, we can revert the commit on the 1.4 branch as long as we keep it 
for >= 1.5

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344870#comment-16344870
 ] 

Stefan Richter commented on FLINK-8411:
---

Ola, I always assumed that to be a copy-paste error when creating the 
\{{ListState}} from the \{{ValueState}} code :D

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344853#comment-16344853
 ] 

Aljoscha Krettek commented on FLINK-8411:
-

This would be different from {{ValueState}}, though: "updating" a 
{{ValueState}} to {{null}} will clear that {{ValueState}}, which is also were 
the behaviour of Heap {{ListState}} comes from, which was added after 
{{ValueState}} (and before the RocksDB states).

The behaviour of {{null == clear()}} was there form the beginning so is 
actually the "intended" behaviour and a "feature". 

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344793#comment-16344793
 ] 

Stefan Richter commented on FLINK-8411:
---

That is a tough one. I feel like the previous behaviour for the heap state is a 
bug, so it should be addressed in a bugfix release. If we allow adding 
\{{null}}, ignore it, or prevent adding it through preconditions is debatable. 
One argument for ignoring is alignment with value state, where {{null}} is 
identical to "no state for this key", so I think the analog would be not adding 
\{{null}}s to the list.

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344779#comment-16344779
 ] 

Aljoscha Krettek commented on FLINK-8411:
-

[~phoenixjiangnan] & [~srichter] I thought about this some more. I don't think 
we can leave the current behaviour for 1.4.1 because it is a change in 
semantics that might break existing code.

The fact that the RocksDB backend and the Heap backend behaved differently 
before is not good but I think the previous RocksDB behaviour might be the 
preferable behaviour: if the serialiser supports it adding {{null}} to the list 
might be a valid thing for some users. Simply ignoring {{nulls}} might lead to 
unexpected consequences.

What do you think?

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335753#comment-16335753
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5300


> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)